Skip to content

Commit 50b1a5b

Browse files
start removing ping logic from the node_map
1 parent 182f78c commit 50b1a5b

File tree

6 files changed

+318
-1168
lines changed

6 files changed

+318
-1168
lines changed

iroh/src/magicsock.rs

Lines changed: 10 additions & 119 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use std::{
2929
};
3030

3131
use bytes::Bytes;
32-
use data_encoding::HEXLOWER;
3332
use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey};
3433
use iroh_relay::RelayMap;
3534
use n0_future::{
@@ -59,7 +58,7 @@ use url::Url;
5958
use self::transports::IpTransport;
6059
use self::{
6160
metrics::Metrics as MagicsockMetrics,
62-
node_map::{NodeMap, PingAction, PingRole, SendPing},
61+
node_map::{NodeMap, PingAction},
6362
transports::{RelayActorConfig, RelayTransport, Transports, UdpSender},
6463
};
6564
#[cfg(not(wasm_browser))]
@@ -880,13 +879,8 @@ impl MagicSock {
880879
let _guard = span.enter();
881880
trace!("receive disco message");
882881
match dm {
883-
disco::Message::Ping(ping) => {
884-
self.metrics.magicsock.recv_disco_ping.inc();
885-
self.handle_ping(ping, sender, src);
886-
}
887-
disco::Message::Pong(pong) => {
888-
self.metrics.magicsock.recv_disco_pong.inc();
889-
self.node_map.handle_pong(sender, src, pong);
882+
disco::Message::Ping(..) | disco::Message::Pong(..) => {
883+
unreachable!("not used anymore");
890884
}
891885
disco::Message::CallMeMaybe(cm) => {
892886
self.metrics.magicsock.recv_disco_call_me_maybe.inc();
@@ -913,99 +907,19 @@ impl MagicSock {
913907
direct_addresses: cm.my_numbers.iter().copied().collect(),
914908
});
915909

916-
let ping_actions =
917-
self.node_map
918-
.handle_call_me_maybe(sender, cm, &self.metrics.magicsock);
919-
920-
for action in ping_actions {
921-
match action {
922-
PingAction::SendCallMeMaybe { .. } => {
923-
warn!("Unexpected CallMeMaybe as response of handling a CallMeMaybe");
924-
}
925-
PingAction::SendPing(ping) => {
926-
self.send_ping_queued(ping);
927-
}
928-
}
929-
}
910+
self.node_map
911+
.handle_call_me_maybe(sender, cm, &self.metrics.magicsock);
930912
}
931913
}
932914
trace!("disco message handled");
933915
}
934916

935-
/// Handle a ping message.
936-
fn handle_ping(&self, dm: disco::Ping, sender: NodeId, src: &transports::Addr) {
937-
// Insert the ping into the node map, and return whether a ping with this tx_id was already
938-
// received.
939-
let addr: SendAddr = src.clone().into();
940-
let handled = self.node_map.handle_ping(sender, addr.clone(), dm.tx_id);
941-
match handled.role {
942-
PingRole::Duplicate => {
943-
debug!(?src, tx = %HEXLOWER.encode(&dm.tx_id), "received ping: path already confirmed, skip");
944-
return;
945-
}
946-
PingRole::LikelyHeartbeat => {}
947-
PingRole::NewPath => {
948-
debug!(?src, tx = %HEXLOWER.encode(&dm.tx_id), "received ping: new path");
949-
}
950-
PingRole::Activate => {
951-
debug!(?src, tx = %HEXLOWER.encode(&dm.tx_id), "received ping: path active");
952-
}
953-
}
954-
955-
// Send a pong.
956-
debug!(tx = %HEXLOWER.encode(&dm.tx_id), %addr, dstkey = %sender.fmt_short(),
957-
"sending pong");
958-
let pong = disco::Message::Pong(disco::Pong {
959-
tx_id: dm.tx_id,
960-
ping_observed_addr: addr.clone(),
961-
});
962-
event!(
963-
target: "iroh::_events::pong::sent",
964-
Level::DEBUG,
965-
remote_node = %sender.fmt_short(),
966-
dst = ?addr,
967-
txn = ?dm.tx_id,
968-
);
969-
970-
if !self.disco.try_send(addr.clone(), sender, pong) {
971-
warn!(%addr, "failed to queue pong");
972-
}
973-
974-
if let Some(ping) = handled.needs_ping_back {
975-
debug!(
976-
%addr,
977-
dstkey = %sender.fmt_short(),
978-
"sending direct ping back",
979-
);
980-
self.send_ping_queued(ping);
981-
}
982-
}
983-
984-
fn send_ping_queued(&self, ping: SendPing) {
985-
let SendPing {
986-
id,
987-
dst,
988-
dst_node,
989-
tx_id,
990-
purpose,
991-
} = ping;
992-
let msg = disco::Message::Ping(disco::Ping {
993-
tx_id,
994-
node_key: self.public_key,
995-
});
996-
let sent = self.disco.try_send(dst.clone(), dst_node, msg);
997-
if sent {
998-
let msg_sender = self.actor_sender.clone();
999-
trace!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent (queued)");
1000-
self.node_map
1001-
.notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
1002-
} else {
1003-
warn!(dst = ?dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "failed to send ping: queues full");
1004-
}
1005-
}
1006-
1007917
/// Send the given ping actions out.
1008-
async fn send_ping_actions(&self, sender: &UdpSender, msgs: Vec<PingAction>) -> io::Result<()> {
918+
async fn send_ping_actions(
919+
&self,
920+
_sender: &UdpSender,
921+
msgs: Vec<PingAction>,
922+
) -> io::Result<()> {
1009923
for msg in msgs {
1010924
// Abort sending as soon as we know we are shutting down.
1011925
if self.is_closing() || self.is_closed() {
@@ -1050,25 +964,6 @@ impl MagicSock {
1050964
}
1051965
}
1052966
}
1053-
PingAction::SendPing(SendPing {
1054-
id,
1055-
dst,
1056-
dst_node,
1057-
tx_id,
1058-
purpose,
1059-
}) => {
1060-
let msg = disco::Message::Ping(disco::Ping {
1061-
tx_id,
1062-
node_key: self.public_key,
1063-
});
1064-
1065-
self.send_disco_message(sender, dst.clone(), dst_node, msg)
1066-
.await?;
1067-
debug!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent");
1068-
let msg_sender = self.actor_sender.clone();
1069-
self.node_map
1070-
.notify_ping_sent(id, dst, tx_id, purpose, msg_sender);
1071-
}
1072967
}
1073968
}
1074969
Ok(())
@@ -1740,7 +1635,6 @@ impl AsyncUdpSocket for MagicUdpSocket {
17401635
#[derive(Debug)]
17411636
enum ActorMessage {
17421637
PingActions(Vec<PingAction>),
1743-
EndpointPingExpired(usize, stun_rs::TransactionId),
17441638
NetworkChange,
17451639
ScheduleDirectAddrUpdate(UpdateReason, Option<(NodeId, RelayUrl)>),
17461640
#[cfg(test)]
@@ -2035,9 +1929,6 @@ impl Actor {
20351929
/// Returns `true` if it was a shutdown.
20361930
async fn handle_actor_message(&mut self, msg: ActorMessage, sender: &UdpSender) {
20371931
match msg {
2038-
ActorMessage::EndpointPingExpired(id, txid) => {
2039-
self.msock.node_map.notify_ping_timeout(id, txid);
2040-
}
20411932
ActorMessage::NetworkChange => {
20421933
self.network_monitor.network_change().await.ok();
20431934
}

iroh/src/magicsock/node_map.rs

Lines changed: 10 additions & 111 deletions
Original file line numberDiff line numberDiff line change
@@ -8,15 +8,14 @@ use std::{
88
use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl};
99
use n0_future::time::Instant;
1010
use serde::{Deserialize, Serialize};
11-
use stun_rs::TransactionId;
1211
use tracing::{debug, info, instrument, trace, warn};
1312

1413
use self::{
1514
best_addr::ClearReason,
16-
node_state::{NodeState, Options, PingHandled},
15+
node_state::{NodeState, Options},
1716
};
18-
use super::{metrics::Metrics, transports, ActorMessage, NodeIdMappedAddr};
19-
use crate::disco::{CallMeMaybe, Pong, SendAddr};
17+
use super::{metrics::Metrics, NodeIdMappedAddr};
18+
use crate::disco::CallMeMaybe;
2019
#[cfg(any(test, feature = "test-utils"))]
2120
use crate::endpoint::PathSelection;
2221

@@ -25,8 +24,8 @@ mod node_state;
2524
mod path_state;
2625
mod udp_paths;
2726

27+
pub(super) use node_state::PingAction;
2828
pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo};
29-
pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing};
3029

3130
/// Number of nodes that are inactive for which we keep info about. This limit is enforced
3231
/// periodically via [`NodeMap::prune_inactive`].
@@ -71,7 +70,6 @@ pub(super) struct NodeMapInner {
7170
/// have for the node. These are all the keys the [`NodeMap`] can use.
7271
#[derive(Debug, Clone)]
7372
enum NodeStateKey {
74-
Idx(usize),
7573
NodeId(NodeId),
7674
NodeIdMappedAddr(NodeIdMappedAddr),
7775
IpPort(IpPort),
@@ -171,35 +169,6 @@ impl NodeMap {
171169
.receive_relay(relay_url, src)
172170
}
173171

174-
pub(super) fn notify_ping_sent(
175-
&self,
176-
id: usize,
177-
dst: SendAddr,
178-
tx_id: stun_rs::TransactionId,
179-
purpose: DiscoPingPurpose,
180-
msg_sender: tokio::sync::mpsc::Sender<ActorMessage>,
181-
) {
182-
if let Some(ep) = self
183-
.inner
184-
.lock()
185-
.expect("poisoned")
186-
.get_mut(NodeStateKey::Idx(id))
187-
{
188-
ep.ping_sent(dst, tx_id, purpose, msg_sender);
189-
}
190-
}
191-
192-
pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun_rs::TransactionId) {
193-
if let Some(ep) = self
194-
.inner
195-
.lock()
196-
.expect("poisoned")
197-
.get_mut(NodeStateKey::Idx(id))
198-
{
199-
ep.ping_timeout(tx_id);
200-
}
201-
}
202-
203172
pub(super) fn get_quic_mapped_addr_for_node_key(
204173
&self,
205174
node_key: NodeId,
@@ -211,38 +180,16 @@ impl NodeMap {
211180
.map(|ep| *ep.quic_mapped_addr())
212181
}
213182

214-
/// Insert a received ping into the node map, and return whether a ping with this tx_id was already
215-
/// received.
216-
pub(super) fn handle_ping(
217-
&self,
218-
sender: PublicKey,
219-
src: SendAddr,
220-
tx_id: TransactionId,
221-
) -> PingHandled {
222-
self.inner
223-
.lock()
224-
.expect("poisoned")
225-
.handle_ping(sender, src, tx_id)
226-
}
227-
228-
pub(super) fn handle_pong(&self, sender: PublicKey, src: &transports::Addr, pong: Pong) {
229-
self.inner
230-
.lock()
231-
.expect("poisoned")
232-
.handle_pong(sender, src, pong)
233-
}
234-
235-
#[must_use = "actions must be handled"]
236183
pub(super) fn handle_call_me_maybe(
237184
&self,
238185
sender: PublicKey,
239186
cm: CallMeMaybe,
240187
metrics: &Metrics,
241-
) -> Vec<PingAction> {
188+
) {
242189
self.inner
243190
.lock()
244191
.expect("poisoned")
245-
.handle_call_me_maybe(sender, cm, metrics)
192+
.handle_call_me_maybe(sender, cm, metrics);
246193
}
247194

248195
#[allow(clippy::type_complexity)]
@@ -404,7 +351,6 @@ impl NodeMapInner {
404351

405352
fn get_id(&self, id: NodeStateKey) -> Option<usize> {
406353
match id {
407-
NodeStateKey::Idx(id) => Some(id),
408354
NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(),
409355
NodeStateKey::NodeIdMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(),
410356
NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(),
@@ -500,25 +446,7 @@ impl NodeMapInner {
500446
.map(|ep| ep.conn_type())
501447
}
502448

503-
fn handle_pong(&mut self, sender: NodeId, src: &transports::Addr, pong: Pong) {
504-
if let Some(ns) = self.get_mut(NodeStateKey::NodeId(sender)).as_mut() {
505-
let insert = ns.handle_pong(&pong, src.clone().into());
506-
if let Some((src, key)) = insert {
507-
self.set_node_key_for_ip_port(src, &key);
508-
}
509-
trace!(?insert, "received pong")
510-
} else {
511-
warn!("received pong: node unknown, ignore")
512-
}
513-
}
514-
515-
#[must_use = "actions must be handled"]
516-
fn handle_call_me_maybe(
517-
&mut self,
518-
sender: NodeId,
519-
cm: CallMeMaybe,
520-
metrics: &Metrics,
521-
) -> Vec<PingAction> {
449+
fn handle_call_me_maybe(&mut self, sender: NodeId, cm: CallMeMaybe, metrics: &Metrics) {
522450
let ns_id = NodeStateKey::NodeId(sender);
523451
if let Some(id) = self.get_id(ns_id.clone()) {
524452
for number in &cm.my_numbers {
@@ -530,43 +458,13 @@ impl NodeMapInner {
530458
None => {
531459
debug!("received call-me-maybe: ignore, node is unknown");
532460
metrics.recv_disco_call_me_maybe_bad_disco.inc();
533-
vec![]
534461
}
535462
Some(ns) => {
536463
debug!(endpoints = ?cm.my_numbers, "received call-me-maybe");
537464

538-
ns.handle_call_me_maybe(cm)
539-
}
540-
}
541-
}
542-
543-
fn handle_ping(&mut self, sender: NodeId, src: SendAddr, tx_id: TransactionId) -> PingHandled {
544-
#[cfg(any(test, feature = "test-utils"))]
545-
let path_selection = self.path_selection;
546-
let node_state = self.get_or_insert_with(NodeStateKey::NodeId(sender), || {
547-
debug!("received ping: node unknown, add to node map");
548-
let source = if src.is_relay() {
549-
Source::Relay
550-
} else {
551-
Source::Udp
552-
};
553-
Options {
554-
node_id: sender,
555-
relay_url: src.relay_url(),
556-
active: true,
557-
source,
558-
#[cfg(any(test, feature = "test-utils"))]
559-
path_selection,
560-
}
561-
});
562-
563-
let handled = node_state.handle_ping(src.clone(), tx_id);
564-
if let SendAddr::Udp(ref addr) = src {
565-
if matches!(handled.role, PingRole::NewPath) {
566-
self.set_node_key_for_ip_port(*addr, &sender);
465+
ns.handle_call_me_maybe(cm);
567466
}
568467
}
569-
handled
570468
}
571469

572470
/// Inserts a new node into the [`NodeMap`].
@@ -704,6 +602,7 @@ mod tests {
704602
use tracing_test::traced_test;
705603

706604
use super::{node_state::MAX_INACTIVE_DIRECT_ADDRESSES, *};
605+
use crate::disco::SendAddr;
707606

708607
impl NodeMap {
709608
#[track_caller]
@@ -836,7 +735,7 @@ mod tests {
836735
let txid = stun_rs::TransactionId::from([i as u8; 12]);
837736
// Note that this already invokes .prune_direct_addresses() because these are
838737
// new UDP paths.
839-
endpoint.handle_ping(addr, txid);
738+
// endpoint.handle_ping(addr, txid);
840739
}
841740

842741
info!("Pruning addresses");

0 commit comments

Comments
 (0)