From fd00e8ad5dd20d8e90ee47ce5b497860400fcfe3 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 2 Apr 2025 13:33:18 +1100 Subject: [PATCH 1/3] Report expired sessions --- src/discv5.rs | 4 +++- src/handler/mod.rs | 24 ++++++++++++++++++++++++ src/lru_time_cache.rs | 14 ++++++++------ src/service.rs | 3 +++ 4 files changed, 38 insertions(+), 7 deletions(-) diff --git a/src/discv5.rs b/src/discv5.rs index 5b7d4553..90a0cf43 100644 --- a/src/discv5.rs +++ b/src/discv5.rs @@ -18,7 +18,7 @@ use crate::{ self, ConnectionDirection, ConnectionState, FailureReason, InsertResult, KBucketsTable, NodeStatus, UpdateResult, }, - node_info::NodeContact, + node_info::{NodeAddress, NodeContact}, packet::ProtocolIdentity, service::{QueryKind, Service, ServiceRequest, TalkRequest}, Config, DefaultProtocolId, Enr, IpMode, @@ -74,6 +74,8 @@ pub enum Event { }, /// A new session has been established with a node. SessionEstablished(Enr, SocketAddr), + /// A session has been removed from our cache due to inactivity. + SessionsExpired(Vec), /// Our local ENR IP address has been updated. SocketUpdated(SocketAddr), /// A node has initiated a talk request. diff --git a/src/handler/mod.rs b/src/handler/mod.rs index ac566adf..30331ee2 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -139,6 +139,8 @@ pub enum HandlerOut { socket: SocketAddr, node_id: NodeId, }, + /// These sessions have expired from the cache. + ExpiredSessions(Vec), } /// How we connected to the node. @@ -1231,6 +1233,9 @@ impl Handler { // handshake to re-establish a session, if applicable. message_nonce: Option, ) { + // Clear the session cache and report expired elements + self.remove_expired_sessions().await; + if let Some(current_session) = self.sessions.get_mut(&node_address) { current_session.update(session); // If a session is re-established, due to a new handshake during an ongoing @@ -1286,6 +1291,9 @@ impl Handler { remove_session: bool, ) { if remove_session { + // Remove expired sessions + self.remove_expired_sessions().await; + self.sessions.remove(node_address); METRICS .active_sessions @@ -1357,6 +1365,22 @@ impl Handler { .retain(|_, time| time.is_none() || Some(Instant::now()) < *time); } + /// Removes expired sessions and report them back to the service. + async fn remove_expired_sessions(&mut self) { + // Purge any expired sessions + let expired_sessions = self.sessions.remove_expired_values(); + + if !expired_sessions.is_empty() { + if let Err(e) = self + .service_send + .send(HandlerOut::ExpiredSessions(expired_sessions)) + .await + { + warn!(error = %e, "Failed to inform app of expired sessions") + } + } + } + /// Returns whether a session with this node does not exist and a request that initiates /// a session has been sent. fn is_awaiting_session_to_be_established(&mut self, node_address: &NodeAddress) -> bool { diff --git a/src/lru_time_cache.rs b/src/lru_time_cache.rs index 4eab65d2..a7d50e25 100644 --- a/src/lru_time_cache.rs +++ b/src/lru_time_cache.rs @@ -37,17 +37,14 @@ impl LruTimeCache { } /// Retrieves a reference to the value stored under `key`, or `None` if the key doesn't exist. - /// Also removes expired elements and updates the time. #[allow(dead_code)] pub fn get(&mut self, key: &K) -> Option<&V> { self.get_mut(key).map(|value| &*value) } /// Retrieves a mutable reference to the value stored under `key`, or `None` if the key doesn't exist. - /// Also removes expired elements and updates the time. pub fn get_mut(&mut self, key: &K) -> Option<&mut V> { let now = Instant::now(); - self.remove_expired_values(now); match self.map.raw_entry_mut().from_key(key) { hashlink::linked_hash_map::RawEntryMut::Occupied(mut occupied) => { @@ -76,7 +73,6 @@ impl LruTimeCache { /// Returns the size of the cache, i.e. the number of cached non-expired key-value pairs. pub fn len(&mut self) -> usize { - self.remove_expired_values(Instant::now()); self.map.len() } @@ -87,13 +83,19 @@ impl LruTimeCache { } /// Removes expired items from the cache. - fn remove_expired_values(&mut self, now: Instant) { + pub fn remove_expired_values(&mut self) -> Vec { + let mut expired_elements = Vec::new(); + let now = Instant::now(); while let Some((_front, (_value, time))) = self.map.front() { if *time + self.ttl >= now { break; } - self.map.pop_front(); + // Store the expired key + if let Some((k, _v)) = self.map.pop_front() { + expired_elements.push(k); + } } + expired_elements } } diff --git a/src/service.rs b/src/service.rs index 5062bdb4..f2640ab4 100644 --- a/src/service.rs +++ b/src/service.rs @@ -416,6 +416,9 @@ impl Service { } self.send_event(Event::UnverifiableEnr{enr, socket, node_id}); } + HandlerOut::ExpiredSessions(expired_sessions) => { + self.send_event(Event::SessionsExpired(expired_sessions)); + } } } event = Service::bucket_maintenance_poll(&self.kbuckets) => { From b34ffdd8a262551be0b06302f0c72d9b7bc95dc3 Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 2 Apr 2025 13:40:24 +1100 Subject: [PATCH 2/3] Sneaky clippy lint --- src/handler/mod.rs | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/handler/mod.rs b/src/handler/mod.rs index 30331ee2..66b2dcad 100644 --- a/src/handler/mod.rs +++ b/src/handler/mod.rs @@ -786,10 +786,10 @@ impl Handler { && match node_address.socket_addr { SocketAddr::V4(socket_addr) => enr .udp4_socket() - .map_or(true, |advertized_addr| socket_addr == advertized_addr), + .is_none_or(|advertized_addr| socket_addr == advertized_addr), SocketAddr::V6(socket_addr) => enr .udp6_socket() - .map_or(true, |advertized_addr| socket_addr == advertized_addr), + .is_none_or(|advertized_addr| socket_addr == advertized_addr), } } From 69bdd5dcc1f7c8acd51d434d392928dd433dc1af Mon Sep 17 00:00:00 2001 From: Age Manning Date: Wed, 2 Apr 2025 13:53:58 +1100 Subject: [PATCH 3/3] Remove old dead-code --- src/lru_time_cache.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/src/lru_time_cache.rs b/src/lru_time_cache.rs index a7d50e25..f14c9fdb 100644 --- a/src/lru_time_cache.rs +++ b/src/lru_time_cache.rs @@ -37,7 +37,6 @@ impl LruTimeCache { } /// Retrieves a reference to the value stored under `key`, or `None` if the key doesn't exist. - #[allow(dead_code)] pub fn get(&mut self, key: &K) -> Option<&V> { self.get_mut(key).map(|value| &*value) } @@ -58,7 +57,6 @@ impl LruTimeCache { /// Returns a reference to the value with the given `key`, if present and not expired, without /// updating the timestamp. - #[allow(dead_code)] pub fn peek(&self, key: &K) -> Option<&V> { if let Some((value, time)) = self.map.get(key) { return if *time + self.ttl >= Instant::now() {