diff --git a/Cargo.lock b/Cargo.lock index 409a3466d1b..722f2c400a1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -503,6 +503,12 @@ version = "3.17.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf" +[[package]] +name = "bytemuck" +version = "1.23.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c76a5792e44e4abe34d3abf15636779261d45a7450612059293d1d2cfc63422" + [[package]] name = "byteorder" version = "1.5.0" @@ -1213,6 +1219,18 @@ version = "0.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" +[[package]] +name = "fastbloom" +version = "0.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "27cea6e7f512d43b098939ff4d5a5d6fe3db07971e1d05176fe26c642d33f5b8" +dependencies = [ + "getrandom 0.3.2", + "rand 0.9.1", + "siphasher", + "wide", +] + [[package]] name = "fastrand" version = "2.3.0" @@ -2457,8 +2475,7 @@ dependencies = [ [[package]] name = "iroh-quinn" version = "0.14.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0cde160ebee7aabede6ae887460cd303c8b809054224815addf1469d54a6fcf7" +source = "git+https://github.com//n0-computer/quinn?branch=flub%2Fopen-path-ensure#646e849d540886ee58e25e3da509d6021ec94430" dependencies = [ "bytes", "cfg_aliases", @@ -2477,12 +2494,13 @@ dependencies = [ [[package]] name = "iroh-quinn-proto" version = "0.13.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "929d5d8fa77d5c304d3ee7cae9aede31f13908bd049f9de8c7c0094ad6f7c535" +source = "git+https://github.com//n0-computer/quinn?branch=flub%2Fopen-path-ensure#646e849d540886ee58e25e3da509d6021ec94430" dependencies = [ "bytes", - "getrandom 0.2.16", - "rand 0.8.5", + "fastbloom", + "getrandom 0.3.2", + "lru-slab", + "rand 0.9.1", "ring", "rustc-hash", "rustls", @@ -2497,16 +2515,15 @@ dependencies = [ [[package]] name = "iroh-quinn-udp" -version = "0.5.7" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c53afaa1049f7c83ea1331f5ebb9e6ebc5fdd69c468b7a22dd598b02c9bcc973" +version = "0.5.12" +source = "git+https://github.com//n0-computer/quinn?branch=flub%2Fopen-path-ensure#646e849d540886ee58e25e3da509d6021ec94430" dependencies = [ "cfg_aliases", "libc", "once_cell", "socket2", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2726,6 +2743,12 @@ dependencies = [ "hashbrown 0.15.3", ] +[[package]] +name = "lru-slab" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154" + [[package]] name = "mainline" version = "5.4.0" @@ -2992,8 +3015,7 @@ dependencies = [ [[package]] name = "netwatch" version = "0.6.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2a829a830199b14989f9bccce6136ab928ab48336ab1f8b9002495dbbbb2edbe" +source = "git+https://github.com/n0-computer/net-tools?branch=feat-multipath#b7ab98d4ff9cc947f2f084004b4cc2a979bb4d06" dependencies = [ "atomic-waker", "bytes", @@ -4066,9 +4088,8 @@ dependencies = [ [[package]] name = "rustls" -version = "0.23.27" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "730944ca083c1c233a75c09f199e973ca499344a2b7ba9e755c457e86fb4a321" +version = "0.23.25" +source = "git+https://github.com/n0-computer/rustls?rev=be02113e7837df60953d02c2bdd0f4634fef3a80#be02113e7837df60953d02c2bdd0f4634fef3a80" dependencies = [ "log", "once_cell", @@ -4147,9 +4168,9 @@ dependencies = [ [[package]] name = "rustls-platform-verifier" -version = "0.5.3" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "19787cda76408ec5404443dc8b31795c87cd8fec49762dc75fa727740d34acc1" +checksum = "4937d110d34408e9e5ad30ba0b0ca3b6a8a390f8db3636db60144ac4fa792750" dependencies = [ "core-foundation 0.10.0", "core-foundation-sys", @@ -4207,6 +4228,15 @@ version = "1.0.20" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "28d3b2b1366ec20994f1fd18c3c594f05c5dd4bc44d8bb0c1c632c8d6829481f" +[[package]] +name = "safe_arch" +version = "0.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "96b02de82ddbe1b636e6170c21be622223aea188ef2e139be0a5b219ec215323" +dependencies = [ + "bytemuck", +] + [[package]] name = "salsa20" version = "0.10.2" @@ -4475,6 +4505,12 @@ dependencies = [ "bitflags 2.9.0", ] +[[package]] +name = "siphasher" +version = "1.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" + [[package]] name = "slab" version = "0.4.9" @@ -5556,6 +5592,16 @@ dependencies = [ "rustls-pki-types", ] +[[package]] +name = "wide" +version = "0.7.33" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0ce5da8ecb62bcd8ec8b7ea19f69a51275e91299be594ea5cc6ef7819e16cd03" +dependencies = [ + "bytemuck", + "safe_arch", +] + [[package]] name = "widestring" version = "1.2.0" diff --git a/Cargo.toml b/Cargo.toml index 593f1d0ec73..4e6324f349d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,3 +40,17 @@ unexpected_cfgs = { level = "warn", check-cfg = ["cfg(iroh_docsrs)", "cfg(iroh_l [workspace.lints.clippy] unused-async = "warn" + + +[patch.crates-io] +rustls = { git = "https://github.com/n0-computer/rustls", rev = "be02113e7837df60953d02c2bdd0f4634fef3a80" } +netwatch = { git = "https://github.com/n0-computer/net-tools", branch = "feat-multipath" } + +[patch."https://github.com/n0-computer/quinn"] +# iroh-quinn = { path = "../iroh-quinn/quinn" } +# iroh-quinn-proto = { path = "../iroh-quinn/quinn-proto" } +# iroh-quinn-udp = { path = "../iroh-quinn/quinn-udp" } + +iroh-quinn = { git = "https://github.com//n0-computer/quinn", branch = "flub/open-path-ensure" } +iroh-quinn-proto = { git = "https://github.com//n0-computer/quinn", branch = "flub/open-path-ensure" } +iroh-quinn-udp = { git = "https://github.com//n0-computer/quinn", branch = "flub/open-path-ensure" } diff --git a/iroh-relay/Cargo.toml b/iroh-relay/Cargo.toml index 26b0b54f821..4ea62c9a5bd 100644 --- a/iroh-relay/Cargo.toml +++ b/iroh-relay/Cargo.toml @@ -42,8 +42,8 @@ postcard = { version = "1", default-features = false, features = [ "use-std", "experimental-derive", ] } -quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false, features = ["rustls-ring"] } -quinn-proto = { package = "iroh-quinn-proto", version = "0.13.0" } +quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x", default-features = false, features = ["rustls-ring"] } +quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" } rand = "0.8" reqwest = { version = "0.12", default-features = false, features = [ "rustls-tls", diff --git a/iroh/Cargo.toml b/iroh/Cargo.toml index cb545a14d98..a66ab47baef 100644 --- a/iroh/Cargo.toml +++ b/iroh/Cargo.toml @@ -48,9 +48,9 @@ pin-project = "1" pkarr = { version = "3.7", default-features = false, features = [ "relays", ] } -quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false, features = ["rustls-ring"] } -quinn-proto = { package = "iroh-quinn-proto", version = "0.13.0" } -quinn-udp = { package = "iroh-quinn-udp", version = "0.5.7" } +quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x", default-features = false, features = ["rustls-ring"] } +quinn-proto = { package = "iroh-quinn-proto", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" } +quinn-udp = { package = "iroh-quinn-udp", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" } rand = "0.8" reqwest = { version = "0.12", default-features = false, features = [ "rustls-tls", @@ -109,7 +109,7 @@ hickory-resolver = "0.25.1" igd-next = { version = "0.16", features = ["aio_tokio"] } netdev = { version = "0.31.0" } portmapper = { version = "0.6.1", default-features = false } -quinn = { package = "iroh-quinn", version = "0.14.0", default-features = false, features = ["runtime-tokio", "rustls-ring"] } +quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x", default-features = false, features = ["runtime-tokio", "rustls-ring"] } tokio = { version = "1", features = [ "io-util", "macros", diff --git a/iroh/bench/Cargo.toml b/iroh/bench/Cargo.toml index c9137856371..5ae74948b2f 100644 --- a/iroh/bench/Cargo.toml +++ b/iroh/bench/Cargo.toml @@ -13,7 +13,7 @@ iroh-metrics = "0.35" n0-future = "0.1.1" n0-snafu = "0.2.0" n0-watcher = "0.2" -quinn = { package = "iroh-quinn", version = "0.14" } +quinn = { package = "iroh-quinn", git = "https://github.com/n0-computer/quinn", branch = "multipath-quinn-0.11.x" } rand = "0.8" rcgen = "0.13" rustls = { version = "0.23", default-features = false, features = ["ring"] } diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index afb37078475..2b9e0c4aa59 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -28,28 +28,6 @@ use n0_future::{time::Duration, Stream}; use n0_watcher::Watcher; use nested_enum_utils::common_fields; use pin_project::pin_project; -use snafu::{ensure, ResultExt, Snafu}; -use tracing::{debug, instrument, trace, warn}; -use url::Url; - -#[cfg(wasm_browser)] -use crate::discovery::pkarr::PkarrResolver; -#[cfg(not(wasm_browser))] -use crate::{discovery::dns::DnsDiscovery, dns::DnsResolver}; -use crate::{ - discovery::{ - pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, - DiscoveryItem, DiscoverySubscribers, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, - IntoDiscoveryError, Lagged, UserData, - }, - magicsock::{self, Handle, NodeIdMappedAddr, OwnAddressSnafu}, - metrics::EndpointMetrics, - net_report::Report, - tls, -}; - -mod rtt_actor; - // Missing still: SendDatagram and ConnectionClose::frame_type's Type. pub use quinn::{ AcceptBi, AcceptUni, AckFrequencyConfig, ApplicationClose, Chunk, ClosedStream, @@ -66,12 +44,29 @@ pub use quinn_proto::{ }, FrameStats, PathStats, TransportError, TransportErrorCode, UdpStats, Written, }; +use snafu::{ensure, ResultExt, Snafu}; +use tracing::{debug, instrument, trace, warn}; +use url::Url; -use self::rtt_actor::RttMessage; pub use super::magicsock::{ AddNodeAddrError, ConnectionType, ControlMsg, DirectAddr, DirectAddrInfo, DirectAddrType, RemoteInfo, Source, }; +#[cfg(wasm_browser)] +use crate::discovery::pkarr::PkarrResolver; +#[cfg(not(wasm_browser))] +use crate::{discovery::dns::DnsDiscovery, dns::DnsResolver}; +use crate::{ + discovery::{ + pkarr::PkarrPublisher, ConcurrentDiscovery, Discovery, DiscoveryContext, DiscoveryError, + DiscoveryItem, DiscoverySubscribers, DiscoveryTask, DynIntoDiscovery, IntoDiscovery, + IntoDiscoveryError, Lagged, UserData, + }, + magicsock::{self, Handle, NodeIdMappedAddr, OwnAddressSnafu}, + metrics::EndpointMetrics, + net_report::Report, + tls, +}; /// The delay to fall back to discovery when direct addresses fail. /// @@ -526,8 +521,6 @@ impl StaticConfig { pub struct Endpoint { /// Handle to the magicsocket/actor msock: Handle, - /// Handle to the actor that resets the quinn RTT estimator - rtt_actor: Arc, /// Configuration structs for quinn, holds the transport config, certificate setup, secret key etc. static_config: Arc, } @@ -637,7 +630,6 @@ impl Endpoint { let metrics = msock.metrics.magicsock.clone(); let ep = Self { msock, - rtt_actor: Arc::new(rtt_actor::RttHandle::new(metrics)), static_config: Arc::new(static_config), }; Ok(ep) @@ -732,8 +724,6 @@ impl Endpoint { self.add_node_addr(node_addr.clone())?; } let node_id = node_addr.node_id; - let direct_addresses = node_addr.direct_addresses.clone(); - let relay_url = node_addr.relay_url.clone(); // Get the mapped IPv6 address from the magic socket. Quinn will connect to this // address. Start discovery for this node if it's enabled and we have no valid or @@ -751,12 +741,7 @@ impl Endpoint { // Start connecting via quinn. This will time out after 10 seconds if no reachable // address is available. - debug!( - ?mapped_addr, - ?direct_addresses, - ?relay_url, - "Attempting connection..." - ); + debug!(?mapped_addr, "Attempting connection..."); let client_config = { let mut alpn_protocols = vec![alpn.to_vec()]; alpn_protocols.extend(options.additional_alpns); @@ -769,15 +754,12 @@ impl Endpoint { client_config }; + let dest_addr = mapped_addr.private_socket_addr(); let server_name = &tls::name::encode(node_id); let connect = self .msock .endpoint() - .connect_with( - client_config, - mapped_addr.private_socket_addr(), - server_name, - ) + .connect_with(client_config, dest_addr, server_name) .context(QuinnSnafu)?; Ok(Connecting { @@ -1389,7 +1371,7 @@ impl Endpoint { None }; match addr { - Some(addr) => { + Some(maddr) => { // We have some way of dialing this node, but that doesn't actually mean // we can actually connect to any of these addresses. // Therefore, we will invoke the discovery service if we haven't received from the @@ -1401,7 +1383,7 @@ impl Endpoint { let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay) .ok() .flatten(); - Ok((addr, discovery)) + Ok((maddr, discovery)) } None => { @@ -1629,8 +1611,7 @@ impl Future for IncomingFuture { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Ready(Ok(inner)) => { - let conn = Connection { inner }; - try_send_rtt_msg(&conn, this.ep, None); + let conn = Connection::new(inner, None, &this.ep); Poll::Ready(Ok(conn)) } } @@ -1717,18 +1698,18 @@ impl Connecting { pub fn into_0rtt(self) -> Result<(Connection, ZeroRttAccepted), Self> { match self.inner.into_0rtt() { Ok((inner, zrtt_accepted)) => { - let conn = Connection { inner }; - let zrtt_accepted = ZeroRttAccepted { - inner: zrtt_accepted, - _discovery_drop_guard: self._discovery_drop_guard, - }; // This call is why `self.remote_node_id` was introduced. // When we `Connecting::into_0rtt`, then we don't yet have `handshake_data` - // in our `Connection`, thus `try_send_rtt_msg` won't be able to pick up + // in our `Connection`, thus we won't be able to pick up // `Connection::remote_node_id`. // Instead, we provide `self.remote_node_id` here - we know it in advance, // after all. - try_send_rtt_msg(&conn, &self.ep, self.remote_node_id); + let conn = Connection::new(inner, self.remote_node_id, &self.ep); + let zrtt_accepted = ZeroRttAccepted { + inner: zrtt_accepted, + _discovery_drop_guard: self._discovery_drop_guard, + }; + Ok((conn, zrtt_accepted)) } Err(inner) => Err(Self { @@ -1767,8 +1748,7 @@ impl Future for Connecting { Poll::Pending => Poll::Pending, Poll::Ready(Err(err)) => Poll::Ready(Err(err)), Poll::Ready(Ok(inner)) => { - let conn = Connection { inner }; - try_send_rtt_msg(&conn, this.ep, *this.remote_node_id); + let conn = Connection::new(inner, *this.remote_node_id, &this.ep); Poll::Ready(Ok(conn)) } } @@ -1826,6 +1806,21 @@ pub struct RemoteNodeIdError { } impl Connection { + fn new(inner: quinn::Connection, remote_id: Option, ep: &Endpoint) -> Self { + let conn = Connection { inner }; + + // Grab the remote identity and register this connection + if let Some(remote) = remote_id { + ep.msock.register_connection(remote, &conn.inner); + } else if let Ok(remote) = conn.remote_node_id() { + ep.msock.register_connection(remote, &conn.inner); + } else { + warn!("unable to determine node id for the remote"); + } + + conn + } + /// Initiates a new outgoing unidirectional stream. /// /// Streams are cheap and instantaneous to open unless blocked by flow control. As a @@ -2122,30 +2117,6 @@ impl Connection { } } -/// Try send a message to the rtt-actor. -/// -/// If we can't notify the actor that will impact performance a little, but we can still -/// function. -fn try_send_rtt_msg(conn: &Connection, magic_ep: &Endpoint, remote_node_id: Option) { - // If we can't notify the rtt-actor that's not great but not critical. - let Some(node_id) = remote_node_id.or_else(|| conn.remote_node_id().ok()) else { - warn!(?conn, "failed to get remote node id"); - return; - }; - let Some(conn_type_changes) = magic_ep.conn_type(node_id) else { - warn!(?conn, "failed to create conn_type stream"); - return; - }; - let rtt_msg = RttMessage::NewConnection { - connection: conn.inner.weak_handle(), - conn_type_changes: conn_type_changes.stream(), - node_id, - }; - if let Err(err) = magic_ep.rtt_actor.msg_tx.try_send(rtt_msg) { - warn!(?conn, "rtt-actor not reachable: {err:#}"); - } -} - /// Read a proxy url from the environment, in this order /// /// - `HTTP_PROXY` diff --git a/iroh/src/endpoint/rtt_actor.rs b/iroh/src/endpoint/rtt_actor.rs deleted file mode 100644 index fa0b6847c25..00000000000 --- a/iroh/src/endpoint/rtt_actor.rs +++ /dev/null @@ -1,171 +0,0 @@ -//! Actor which coordinates the congestion controller for the magic socket - -use std::{pin::Pin, sync::Arc, task::Poll}; - -use iroh_base::NodeId; -use n0_future::{ - task::{self, AbortOnDropHandle}, - MergeUnbounded, Stream, StreamExt, -}; -use tokio::sync::mpsc; -use tracing::{debug, info_span, Instrument}; - -use crate::{magicsock::ConnectionType, metrics::MagicsockMetrics}; - -#[derive(Debug)] -pub(super) struct RttHandle { - // We should and some point use this to propagate panics and errors. - pub(super) _handle: AbortOnDropHandle<()>, - pub(super) msg_tx: mpsc::Sender, -} - -impl RttHandle { - pub(super) fn new(metrics: Arc) -> Self { - let mut actor = RttActor { - connection_events: Default::default(), - metrics, - }; - let (msg_tx, msg_rx) = mpsc::channel(16); - let handle = task::spawn( - async move { - actor.run(msg_rx).await; - } - .instrument(info_span!("rtt-actor")), - ); - Self { - _handle: AbortOnDropHandle::new(handle), - msg_tx, - } - } -} - -/// Messages to send to the [`RttActor`]. -#[derive(Debug)] -pub(super) enum RttMessage { - /// Informs the [`RttActor`] of a new connection is should monitor. - NewConnection { - /// The connection. - connection: quinn::WeakConnectionHandle, - /// Path changes for this connection from the magic socket. - conn_type_changes: n0_watcher::Stream>, - /// For reporting-only, the Node ID of this connection. - node_id: NodeId, - }, -} - -/// Actor to coordinate congestion controller state with magic socket state. -/// -/// The magic socket can change the underlying network path, between two nodes. If we can -/// inform the QUIC congestion controller of this event it will work much more efficiently. -#[derive(derive_more::Debug)] -struct RttActor { - /// Stream of connection type changes. - #[debug("MergeUnbounded>")] - connection_events: MergeUnbounded, - metrics: Arc, -} - -#[derive(Debug)] -struct MappedStream { - stream: n0_watcher::Stream>, - node_id: NodeId, - /// Reference to the connection. - connection: quinn::WeakConnectionHandle, - /// This an indiciator of whether this connection was direct before. - /// This helps establish metrics on number of connections that became direct. - was_direct_before: bool, -} - -struct ConnectionEvent { - became_direct: bool, -} - -impl Stream for MappedStream { - type Item = ConnectionEvent; - - /// Performs the congestion controller reset for a magic socket path change. - /// - /// Regardless of which kind of path we are changed to, the congestion controller needs - /// resetting. Even when switching to mixed we should reset the state as e.g. switching - /// from direct to mixed back to direct should be a rare exception and is a bug if this - /// happens commonly. - fn poll_next( - mut self: Pin<&mut Self>, - cx: &mut std::task::Context<'_>, - ) -> Poll> { - match Pin::new(&mut self.stream).poll_next(cx) { - Poll::Ready(Some(new_conn_type)) => { - let mut became_direct = false; - if self.connection.network_path_changed() { - debug!( - node_id = %self.node_id.fmt_short(), - new_type = ?new_conn_type, - "Congestion controller state reset", - ); - if !self.was_direct_before && matches!(new_conn_type, ConnectionType::Direct(_)) - { - self.was_direct_before = true; - became_direct = true - } - }; - Poll::Ready(Some(ConnectionEvent { became_direct })) - } - Poll::Ready(None) => Poll::Ready(None), - Poll::Pending => Poll::Pending, - } - } -} - -impl RttActor { - /// Runs the actor main loop. - /// - /// The main loop will finish when the sender is dropped. - async fn run(&mut self, mut msg_rx: mpsc::Receiver) { - loop { - tokio::select! { - biased; - msg = msg_rx.recv() => { - match msg { - Some(msg) => self.handle_msg(msg), - None => break, - } - } - event = self.connection_events.next(), if !self.connection_events.is_empty() => { - if event.map(|e| e.became_direct).unwrap_or(false) { - self.metrics.connection_became_direct.inc(); - } - } - } - } - debug!("rtt-actor finished"); - } - - /// Handle actor messages. - fn handle_msg(&mut self, msg: RttMessage) { - match msg { - RttMessage::NewConnection { - connection, - conn_type_changes, - node_id, - } => { - self.handle_new_connection(connection, conn_type_changes, node_id); - } - } - } - - /// Handles the new connection message. - fn handle_new_connection( - &mut self, - connection: quinn::WeakConnectionHandle, - conn_type_changes: n0_watcher::Stream>, - node_id: NodeId, - ) { - self.connection_events.push(MappedStream { - stream: conn_type_changes, - connection, - node_id, - was_direct_before: false, - }); - self.metrics.connection_handshake_success.inc(); - } -} diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 73e45a86cb3..aa1d8e39df4 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -29,7 +29,6 @@ use std::{ }; use bytes::Bytes; -use data_encoding::HEXLOWER; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey}; use iroh_relay::RelayMap; use n0_future::{ @@ -43,8 +42,9 @@ use nested_enum_utils::common_fields; use netwatch::netmon; #[cfg(not(wasm_browser))] use netwatch::{ip::LocalAddresses, UdpSocket}; -use quinn::{AsyncUdpSocket, ServerConfig}; +use quinn::{AsyncUdpSocket, ServerConfig, WeakConnectionHandle}; use rand::Rng; +use relay_mapped_addrs::{IpMappedAddr, RelayMappedAddresses}; use smallvec::SmallVec; use snafu::{ResultExt, Snafu}; use tokio::sync::{mpsc, Mutex as AsyncMutex}; @@ -59,7 +59,7 @@ use url::Url; use self::transports::IpTransport; use self::{ metrics::Metrics as MagicsockMetrics, - node_map::{NodeMap, PingAction, PingRole, SendPing}, + node_map::{NodeMap, PingAction}, transports::{RelayActorConfig, RelayTransport, Transports, UdpSender}, }; #[cfg(not(wasm_browser))] @@ -67,18 +67,19 @@ use crate::dns::DnsResolver; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; #[cfg(not(wasm_browser))] -use crate::net_report::{IpMappedAddr, QuicConfig}; +use crate::net_report::QuicConfig; use crate::{ defaults::timeouts::NET_REPORT_TIMEOUT, disco::{self, SendAddr}, discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData}, key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret}, metrics::EndpointMetrics, - net_report::{self, IfStateDetails, IpMappedAddresses, Report}, + net_report::{self, IfStateDetails, Report}, }; mod metrics; mod node_map; +mod relay_mapped_addrs; pub(crate) mod transports; @@ -195,8 +196,11 @@ pub(crate) struct MagicSock { ipv6_reported: Arc, /// Tracks the networkmap node entity for each node discovery key. node_map: NodeMap, + /// Tracks existing connections + connection_map: ConnectionMap, + /// Tracks the mapped IP addresses - ip_mapped_addrs: IpMappedAddresses, + relay_mapped_addrs: RelayMappedAddresses, /// Local addresses local_addrs_watch: LocalAddrsWatch, /// Currently bound IP addresses of all sockets @@ -221,6 +225,22 @@ pub(crate) struct MagicSock { pub(crate) metrics: EndpointMetrics, } +#[derive(Default, Debug)] +struct ConnectionMap { + map: std::sync::Mutex>>, +} + +impl ConnectionMap { + fn insert(&self, remote: NodeId, handle: WeakConnectionHandle) { + self.map + .lock() + .expect("poisoned") + .entry(remote) + .or_default() + .push(handle); + } +} + #[allow(missing_docs)] #[common_fields({ backtrace: Option, @@ -271,6 +291,33 @@ impl MagicSock { self.local_addrs_watch.get().expect("disconnected") } + pub(crate) fn register_connection(&self, remote: NodeId, conn: &quinn::Connection) { + let weak_handle = conn.weak_handle(); + self.connection_map.insert(remote, weak_handle); + + // TODO: track task + // TODO: find a good home for this + let mut path_events = conn.path_events(); + let _task = task::spawn(async move { + loop { + match path_events.recv().await { + Ok(event) => { + info!(remote = %remote, "path event: {:?}", event); + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => { + warn!("lagged path events"); + } + Err(tokio::sync::broadcast::error::RecvError::Closed) => break, + } + } + }); + + // open additional paths + if let Some(addr) = self.node_map.get_current_addr(remote) { + self.add_paths(addr); + } + } + #[cfg(not(wasm_browser))] fn ip_bind_addrs(&self) -> &[SocketAddr] { &self.ip_bind_addrs @@ -378,6 +425,10 @@ impl MagicSock { self.node_map.get_quic_mapped_addr_for_node_key(node_id) } + pub(crate) fn get_direct_addrs(&self, node_id: NodeId) -> Vec { + self.node_map.get_direct_addrs(node_id) + } + /// Add addresses for a node to the magic socket's addresbook. #[instrument(skip_all)] pub fn add_node_addr( @@ -393,8 +444,18 @@ impl MagicSock { } } if !addr.is_empty() { + // Add addr to the internal NodeMap self.node_map - .add_node_addr(addr, source, &self.metrics.magicsock); + .add_node_addr(addr.clone(), source, &self.metrics.magicsock); + + if let Some(url) = addr.relay_url() { + self.relay_mapped_addrs + .get_or_register(url.clone(), addr.node_id); + } + + // Add paths to the existing connections + self.add_paths(addr); + Ok(()) } else if pruned != 0 { Err(EmptyPrunedSnafu { pruned }.build()) @@ -403,6 +464,69 @@ impl MagicSock { } } + /// Adds all available addresses in the given `addr` as paths + fn add_paths(&self, addr: NodeAddr) { + let mut map = self.connection_map.map.lock().expect("poisoned"); + let mut to_delete = Vec::new(); + if let Some(conns) = map.get_mut(&addr.node_id) { + for (i, conn) in conns.into_iter().enumerate() { + if let Some(conn) = conn.upgrade() { + for addr in addr.direct_addresses() { + let conn = conn.clone(); + let addr = *addr; + task::spawn(async move { + match conn + .open_path_ensure(addr, quinn_proto::PathStatus::Available) + .await + { + Ok(path) => { + path.set_max_idle_timeout(Some( + ENDPOINTS_FRESH_ENOUGH_DURATION, + )) + .ok(); + path.set_keep_alive_interval(Some(HEARTBEAT_INTERVAL)).ok(); + } + Err(err) => { + warn!("failed to open path {:?}", err); + } + } + }); + } + // Insert the relay addr + if let Some(addr) = self.get_mapping_addr(addr.node_id) { + let conn = conn.clone(); + let addr = addr.private_socket_addr(); + task::spawn(async move { + match conn + .open_path_ensure(addr, quinn_proto::PathStatus::Backup) + .await + { + Ok(path) => { + // Keep the relay path open + path.set_max_idle_timeout(None).ok(); + path.set_keep_alive_interval(None).ok(); + } + Err(err) => { + warn!("failed to open path {:?}", err); + } + } + }); + } + } else { + to_delete.push(i); + } + } + // cleanup dead connections + let mut i = 0; + conns.retain(|_| { + let remove = to_delete.contains(&i); + i += 1; + + !remove + }); + } + } + /// Stores a new set of direct addresses. /// /// If the direct addresses have changed from the previous set, they are published to @@ -504,11 +628,8 @@ impl MagicSock { let mut active_paths = SmallVec::<[_; 3]>::new(); - match MappedAddr::from(transmit.destination) { - MappedAddr::None(dest) => { - error!(%dest, "Cannot convert to a mapped address."); - } - MappedAddr::NodeId(dest) => { + match MultipathMappedAddr::from(transmit.destination) { + MultipathMappedAddr::Mixed(dest) => { trace!( dst = %dest, src = ?transmit.src_ip, @@ -529,12 +650,14 @@ impl MagicSock { .try_send(ActorMessage::PingActions(ping_actions)) .ok(); } - if let Some(addr) = udp_addr { - active_paths.push(transports::Addr::from(addr)); - } + // Mixed will send all available addrs + if let Some(url) = relay_url { active_paths.push(transports::Addr::Relay(url, node_id)); } + if let Some(addr) = udp_addr { + active_paths.push(transports::Addr::Ip(addr)); + } } None => { error!(%dest, "no NodeState for mapped address"); @@ -542,7 +665,10 @@ impl MagicSock { } } #[cfg(not(wasm_browser))] - MappedAddr::Ip(dest) => { + MultipathMappedAddr::Ip(addr) => { + active_paths.push(transports::Addr::Ip(addr)); + } + MultipathMappedAddr::Relay(dest) => { trace!( dst = %dest, src = ?transmit.src_ip, @@ -552,9 +678,9 @@ impl MagicSock { // Check if this is a known IpMappedAddr, and if so, send over UDP // Get the socket addr - match self.ip_mapped_addrs.get_ip_addr(&dest) { - Some(addr) => { - active_paths.push(transports::Addr::from(addr)); + match self.relay_mapped_addrs.get_url(&dest) { + Some((relay, node_id)) => { + active_paths.push(transports::Addr::Relay(relay, node_id)); } None => { error!(%dest, "unknown mapped address"); @@ -677,29 +803,15 @@ impl MagicSock { // Update the NodeMap and remap RecvMeta to the NodeIdMappedAddr. match self.node_map.receive_udp(*addr) { None => { - // Check if this address is mapped to an IpMappedAddr - if let Some(ip_mapped_addr) = - self.ip_mapped_addrs.get_mapped_addr(addr) - { - trace!( - src = %addr, - count = %quic_datagram_count, - len = quinn_meta.len, - "UDP recv QUIC address discovery packets", - ); - quic_packets_total += quic_datagram_count; - quinn_meta.addr = ip_mapped_addr.private_socket_addr(); - } else { - warn!( - src = %addr, - count = %quic_datagram_count, - len = quinn_meta.len, - "UDP recv quic packets: no node state found, skipping", - ); - // If we have no node state for the from addr, set len to 0 to make - // quinn skip the buf completely. - quinn_meta.len = 0; - } + trace!( + src = %addr, + count = %quic_datagram_count, + len = quinn_meta.len, + "UDP recv quic packets", + ); + + quic_packets_total += quic_datagram_count; + quinn_meta.addr = *addr; } Some((node_id, quic_mapped_addr)) => { trace!( @@ -716,8 +828,11 @@ impl MagicSock { } transports::Addr::Relay(src_url, src_node) => { // Relay - let quic_mapped_addr = self.node_map.receive_relay(src_url, *src_node); - quinn_meta.addr = quic_mapped_addr.private_socket_addr(); + let _quic_mapped_addr = self.node_map.receive_relay(src_url, *src_node); + let mapped_addr = self + .relay_mapped_addrs + .get_or_register(src_url.clone(), *src_node); + quinn_meta.addr = mapped_addr.private_socket_addr(); } } } else { @@ -789,13 +904,8 @@ impl MagicSock { let _guard = span.enter(); trace!("receive disco message"); match dm { - disco::Message::Ping(ping) => { - self.metrics.magicsock.recv_disco_ping.inc(); - self.handle_ping(ping, sender, src); - } - disco::Message::Pong(pong) => { - self.metrics.magicsock.recv_disco_pong.inc(); - self.node_map.handle_pong(sender, src, pong); + disco::Message::Ping(..) | disco::Message::Pong(..) => { + unreachable!("not used anymore"); } disco::Message::CallMeMaybe(cm) => { self.metrics.magicsock.recv_disco_call_me_maybe.inc(); @@ -814,98 +924,27 @@ impl MagicSock { return; } } - let ping_actions = - self.node_map - .handle_call_me_maybe(sender, cm, &self.metrics.magicsock); - for action in ping_actions { - match action { - PingAction::SendCallMeMaybe { .. } => { - warn!("Unexpected CallMeMaybe as response of handling a CallMeMaybe"); - } - PingAction::SendPing(ping) => { - self.send_ping_queued(ping); - } - } - } - } - } - trace!("disco message handled"); - } - - /// Handle a ping message. - fn handle_ping(&self, dm: disco::Ping, sender: NodeId, src: &transports::Addr) { - // Insert the ping into the node map, and return whether a ping with this tx_id was already - // received. - let addr: SendAddr = src.clone().into(); - let handled = self.node_map.handle_ping(sender, addr.clone(), dm.tx_id); - match handled.role { - PingRole::Duplicate => { - debug!(?src, tx = %HEXLOWER.encode(&dm.tx_id), "received ping: path already confirmed, skip"); - return; - } - PingRole::LikelyHeartbeat => {} - PingRole::NewPath => { - debug!(?src, tx = %HEXLOWER.encode(&dm.tx_id), "received ping: new path"); - } - PingRole::Activate => { - debug!(?src, tx = %HEXLOWER.encode(&dm.tx_id), "received ping: path active"); - } - } - - // Send a pong. - debug!(tx = %HEXLOWER.encode(&dm.tx_id), %addr, dstkey = %sender.fmt_short(), - "sending pong"); - let pong = disco::Message::Pong(disco::Pong { - tx_id: dm.tx_id, - ping_observed_addr: addr.clone(), - }); - event!( - target: "iroh::_events::pong::sent", - Level::DEBUG, - remote_node = %sender.fmt_short(), - dst = ?addr, - txn = ?dm.tx_id, - ); - - if !self.disco.try_send(addr.clone(), sender, pong) { - warn!(%addr, "failed to queue pong"); - } - if let Some(ping) = handled.needs_ping_back { - debug!( - %addr, - dstkey = %sender.fmt_short(), - "sending direct ping back", - ); - self.send_ping_queued(ping); - } - } + // Add new addresses as paths + self.add_paths(NodeAddr { + node_id: sender, + relay_url: None, + direct_addresses: cm.my_numbers.iter().copied().collect(), + }); - fn send_ping_queued(&self, ping: SendPing) { - let SendPing { - id, - dst, - dst_node, - tx_id, - purpose, - } = ping; - let msg = disco::Message::Ping(disco::Ping { - tx_id, - node_key: self.public_key, - }); - let sent = self.disco.try_send(dst.clone(), dst_node, msg); - if sent { - let msg_sender = self.actor_sender.clone(); - trace!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent (queued)"); - self.node_map - .notify_ping_sent(id, dst, tx_id, purpose, msg_sender); - } else { - warn!(dst = ?dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "failed to send ping: queues full"); + self.node_map + .handle_call_me_maybe(sender, cm, &self.metrics.magicsock); + } } + trace!("disco message handled"); } /// Send the given ping actions out. - async fn send_ping_actions(&self, sender: &UdpSender, msgs: Vec) -> io::Result<()> { + async fn send_ping_actions( + &self, + _sender: &UdpSender, + msgs: Vec, + ) -> io::Result<()> { for msg in msgs { // Abort sending as soon as we know we are shutting down. if self.is_closing() || self.is_closed() { @@ -950,25 +989,6 @@ impl MagicSock { } } } - PingAction::SendPing(SendPing { - id, - dst, - dst_node, - tx_id, - purpose, - }) => { - let msg = disco::Message::Ping(disco::Ping { - tx_id, - node_key: self.public_key, - }); - - self.send_disco_message(sender, dst.clone(), dst_node, msg) - .await?; - debug!(%dst, tx = %HEXLOWER.encode(&tx_id), ?purpose, "ping sent"); - let msg_sender = self.actor_sender.clone(); - self.node_map - .notify_ping_sent(id, dst, tx_id, purpose, msg_sender); - } } } Ok(()) @@ -1042,27 +1062,35 @@ impl MagicSock { } } +/// Definies the translation of addresses in quinn land vs iroh land. +/// +/// This is necessary, because quinn can only reason about `SocketAddr`s. #[derive(Clone, Debug)] -enum MappedAddr { - NodeId(NodeIdMappedAddr), +pub(crate) enum MultipathMappedAddr { + /// Used for the initial connection. + /// - Only used for sending + /// - This means send on all known paths/transports + Mixed(NodeIdMappedAddr), + /// Relay based transport address + Relay(IpMappedAddr), // TODO: RelayMappedAddr? + /// IP based transport address #[cfg(not(wasm_browser))] - Ip(IpMappedAddr), - None(SocketAddr), + Ip(SocketAddr), } -impl From for MappedAddr { +impl From for MultipathMappedAddr { fn from(value: SocketAddr) -> Self { match value.ip() { - IpAddr::V4(_) => MappedAddr::None(value), + IpAddr::V4(_) => Self::Ip(value), IpAddr::V6(addr) => { if let Ok(node_id_mapped_addr) = NodeIdMappedAddr::try_from(addr) { - return MappedAddr::NodeId(node_id_mapped_addr); + return Self::Mixed(node_id_mapped_addr); } #[cfg(not(wasm_browser))] if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) { - return MappedAddr::Ip(ip_mapped_addr); + return Self::Relay(ip_mapped_addr); } - MappedAddr::None(value) + Self::Ip(value) } } } @@ -1247,7 +1275,7 @@ impl Handle { let (ip_transports, port_mapper) = bind_ip(addr_v4, addr_v6, &metrics).context(BindSocketsSnafu)?; - let ip_mapped_addrs = IpMappedAddresses::default(); + let relay_mapped_addrs = RelayMappedAddresses::default(); let (actor_sender, actor_receiver) = mpsc::channel(256); @@ -1293,7 +1321,8 @@ impl Handle { actor_sender: actor_sender.clone(), ipv6_reported, node_map, - ip_mapped_addrs: ip_mapped_addrs.clone(), + connection_map: Default::default(), + relay_mapped_addrs, discovery, discovery_user_data: RwLock::new(discovery_user_data), direct_addrs: Default::default(), @@ -1365,7 +1394,6 @@ impl Handle { #[cfg(not(wasm_browser))] dns_resolver, #[cfg(not(wasm_browser))] - Some(ip_mapped_addrs), relay_map.clone(), net_report_config, metrics.net_report.clone(), @@ -1639,7 +1667,6 @@ impl AsyncUdpSocket for MagicUdpSocket { #[derive(Debug)] enum ActorMessage { PingActions(Vec), - EndpointPingExpired(usize, stun_rs::TransactionId), NetworkChange, ScheduleDirectAddrUpdate(UpdateReason, Option<(NodeId, RelayUrl)>), #[cfg(test)] @@ -1934,9 +1961,6 @@ impl Actor { /// Returns `true` if it was a shutdown. async fn handle_actor_message(&mut self, msg: ActorMessage, sender: &UdpSender) { match msg { - ActorMessage::EndpointPingExpired(id, txid) => { - self.msock.node_map.notify_ping_timeout(id, txid); - } ActorMessage::NetworkChange => { self.network_monitor.network_change().await.ok(); } @@ -2622,10 +2646,12 @@ mod tests { info!("stats: {:#?}", stats); // TODO: ensure panics in this function are reported ok if matches!(loss, ExpectedLoss::AlmostNone) { - assert!( - stats.path.lost_packets < 10, - "[receiver] should not loose many packets", - ); + for (id, path) in &stats.paths { + assert!( + path.lost_packets < 10, + "[receiver] path {id:?} should not loose many packets", + ); + } } info!("close"); @@ -2673,10 +2699,12 @@ mod tests { let stats = conn.stats(); info!("stats: {:#?}", stats); if matches!(loss, ExpectedLoss::AlmostNone) { - assert!( - stats.path.lost_packets < 10, - "[sender] should not loose many packets", - ); + for (id, path) in &stats.paths { + assert!( + path.lost_packets < 10, + "[sender] path {id:?} should not loose many packets", + ); + } } info!("close"); @@ -3173,17 +3201,18 @@ mod tests { let _accept_task = AbortOnDropHandle::new(accept_task); // Add an empty entry in the NodeMap of ep_1 - msock_1.node_map.add_node_addr( - NodeAddr { - node_id: node_id_2, - relay_url: None, - direct_addresses: Default::default(), - }, - Source::NamedApp { - name: "test".into(), - }, - &msock_1.metrics.magicsock, - ); + msock_1 + .add_node_addr( + NodeAddr { + node_id: node_id_2, + relay_url: None, + direct_addresses: Default::default(), + }, + Source::NamedApp { + name: "test".into(), + }, + ) + .unwrap(); let addr_2 = msock_1.get_mapping_addr(node_id_2).unwrap(); // Set a low max_idle_timeout so quinn gives up on this quickly and our test does @@ -3210,24 +3239,25 @@ mod tests { info!("first connect timed out as expected"); // Provide correct addressing information - msock_1.node_map.add_node_addr( - NodeAddr { - node_id: node_id_2, - relay_url: None, - direct_addresses: msock_2 - .direct_addresses() - .initialized() - .await - .expect("no direct addrs") - .into_iter() - .map(|x| x.addr) - .collect(), - }, - Source::NamedApp { - name: "test".into(), - }, - &msock_1.metrics.magicsock, - ); + msock_1 + .add_node_addr( + NodeAddr { + node_id: node_id_2, + relay_url: None, + direct_addresses: msock_2 + .direct_addresses() + .initialized() + .await + .expect("no direct addrs") + .into_iter() + .map(|x| x.addr) + .collect(), + }, + Source::NamedApp { + name: "test".into(), + }, + ) + .unwrap(); // We can now connect tokio::time::timeout(Duration::from_secs(10), async move { diff --git a/iroh/src/magicsock/node_map.rs b/iroh/src/magicsock/node_map.rs index 252f5d0eb97..71dc317f8e3 100644 --- a/iroh/src/magicsock/node_map.rs +++ b/iroh/src/magicsock/node_map.rs @@ -8,15 +8,14 @@ use std::{ use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; use n0_future::time::Instant; use serde::{Deserialize, Serialize}; -use stun_rs::TransactionId; use tracing::{debug, info, instrument, trace, warn}; use self::{ best_addr::ClearReason, - node_state::{NodeState, Options, PingHandled}, + node_state::{NodeState, Options}, }; -use super::{metrics::Metrics, transports, ActorMessage, NodeIdMappedAddr}; -use crate::disco::{CallMeMaybe, Pong, SendAddr}; +use super::{metrics::Metrics, NodeIdMappedAddr}; +use crate::disco::CallMeMaybe; #[cfg(any(test, feature = "test-utils"))] use crate::endpoint::PathSelection; @@ -25,8 +24,8 @@ mod node_state; mod path_state; mod udp_paths; +pub(super) use node_state::PingAction; pub use node_state::{ConnectionType, ControlMsg, DirectAddrInfo, RemoteInfo}; -pub(super) use node_state::{DiscoPingPurpose, PingAction, PingRole, SendPing}; /// Number of nodes that are inactive for which we keep info about. This limit is enforced /// periodically via [`NodeMap::prune_inactive`]. @@ -71,7 +70,6 @@ pub(super) struct NodeMapInner { /// have for the node. These are all the keys the [`NodeMap`] can use. #[derive(Debug, Clone)] enum NodeStateKey { - Idx(usize), NodeId(NodeId), NodeIdMappedAddr(NodeIdMappedAddr), IpPort(IpPort), @@ -171,35 +169,6 @@ impl NodeMap { .receive_relay(relay_url, src) } - pub(super) fn notify_ping_sent( - &self, - id: usize, - dst: SendAddr, - tx_id: stun_rs::TransactionId, - purpose: DiscoPingPurpose, - msg_sender: tokio::sync::mpsc::Sender, - ) { - if let Some(ep) = self - .inner - .lock() - .expect("poisoned") - .get_mut(NodeStateKey::Idx(id)) - { - ep.ping_sent(dst, tx_id, purpose, msg_sender); - } - } - - pub(super) fn notify_ping_timeout(&self, id: usize, tx_id: stun_rs::TransactionId) { - if let Some(ep) = self - .inner - .lock() - .expect("poisoned") - .get_mut(NodeStateKey::Idx(id)) - { - ep.ping_timeout(tx_id); - } - } - pub(super) fn get_quic_mapped_addr_for_node_key( &self, node_key: NodeId, @@ -211,38 +180,33 @@ impl NodeMap { .map(|ep| *ep.quic_mapped_addr()) } - /// Insert a received ping into the node map, and return whether a ping with this tx_id was already - /// received. - pub(super) fn handle_ping( - &self, - sender: PublicKey, - src: SendAddr, - tx_id: TransactionId, - ) -> PingHandled { + pub(super) fn get_direct_addrs(&self, node_key: NodeId) -> Vec { self.inner .lock() .expect("poisoned") - .handle_ping(sender, src, tx_id) + .get(NodeStateKey::NodeId(node_key)) + .map(|ep| ep.direct_addresses().map(Into::into).collect()) + .unwrap_or_default() } - pub(super) fn handle_pong(&self, sender: PublicKey, src: &transports::Addr, pong: Pong) { + pub(super) fn get_current_addr(&self, node_key: NodeId) -> Option { self.inner .lock() .expect("poisoned") - .handle_pong(sender, src, pong) + .get(NodeStateKey::NodeId(node_key)) + .map(|ep| ep.get_current_addr()) } - #[must_use = "actions must be handled"] pub(super) fn handle_call_me_maybe( &self, sender: PublicKey, cm: CallMeMaybe, metrics: &Metrics, - ) -> Vec { + ) { self.inner .lock() .expect("poisoned") - .handle_call_me_maybe(sender, cm, metrics) + .handle_call_me_maybe(sender, cm, metrics); } #[allow(clippy::type_complexity)] @@ -404,7 +368,6 @@ impl NodeMapInner { fn get_id(&self, id: NodeStateKey) -> Option { match id { - NodeStateKey::Idx(id) => Some(id), NodeStateKey::NodeId(node_key) => self.by_node_key.get(&node_key).copied(), NodeStateKey::NodeIdMappedAddr(addr) => self.by_quic_mapped_addr.get(&addr).copied(), NodeStateKey::IpPort(ipp) => self.by_ip_port.get(&ipp).copied(), @@ -500,25 +463,7 @@ impl NodeMapInner { .map(|ep| ep.conn_type()) } - fn handle_pong(&mut self, sender: NodeId, src: &transports::Addr, pong: Pong) { - if let Some(ns) = self.get_mut(NodeStateKey::NodeId(sender)).as_mut() { - let insert = ns.handle_pong(&pong, src.clone().into()); - if let Some((src, key)) = insert { - self.set_node_key_for_ip_port(src, &key); - } - trace!(?insert, "received pong") - } else { - warn!("received pong: node unknown, ignore") - } - } - - #[must_use = "actions must be handled"] - fn handle_call_me_maybe( - &mut self, - sender: NodeId, - cm: CallMeMaybe, - metrics: &Metrics, - ) -> Vec { + fn handle_call_me_maybe(&mut self, sender: NodeId, cm: CallMeMaybe, metrics: &Metrics) { let ns_id = NodeStateKey::NodeId(sender); if let Some(id) = self.get_id(ns_id.clone()) { for number in &cm.my_numbers { @@ -530,43 +475,13 @@ impl NodeMapInner { None => { debug!("received call-me-maybe: ignore, node is unknown"); metrics.recv_disco_call_me_maybe_bad_disco.inc(); - vec![] } Some(ns) => { debug!(endpoints = ?cm.my_numbers, "received call-me-maybe"); - ns.handle_call_me_maybe(cm) - } - } - } - - fn handle_ping(&mut self, sender: NodeId, src: SendAddr, tx_id: TransactionId) -> PingHandled { - #[cfg(any(test, feature = "test-utils"))] - let path_selection = self.path_selection; - let node_state = self.get_or_insert_with(NodeStateKey::NodeId(sender), || { - debug!("received ping: node unknown, add to node map"); - let source = if src.is_relay() { - Source::Relay - } else { - Source::Udp - }; - Options { - node_id: sender, - relay_url: src.relay_url(), - active: true, - source, - #[cfg(any(test, feature = "test-utils"))] - path_selection, - } - }); - - let handled = node_state.handle_ping(src.clone(), tx_id); - if let SendAddr::Udp(ref addr) = src { - if matches!(handled.role, PingRole::NewPath) { - self.set_node_key_for_ip_port(*addr, &sender); + ns.handle_call_me_maybe(cm); } } - handled } /// Inserts a new node into the [`NodeMap`]. @@ -704,6 +619,7 @@ mod tests { use tracing_test::traced_test; use super::{node_state::MAX_INACTIVE_DIRECT_ADDRESSES, *}; + use crate::disco::SendAddr; impl NodeMap { #[track_caller] @@ -836,7 +752,7 @@ mod tests { let txid = stun_rs::TransactionId::from([i as u8; 12]); // Note that this already invokes .prune_direct_addresses() because these are // new UDP paths. - endpoint.handle_ping(addr, txid); + // endpoint.handle_ping(addr, txid); } info!("Pruning addresses"); diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs index 48866e27813..cba37266277 100644 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ b/iroh/src/magicsock/node_map/best_addr.rs @@ -32,7 +32,6 @@ impl BestAddrInner { #[derive(Debug)] pub(super) enum Source { - ReceivedPong, BestCandidate, Udp, } @@ -40,7 +39,6 @@ pub(super) enum Source { impl Source { fn trust_until(&self, from: Instant) -> Instant { match self { - Source::ReceivedPong => from + TRUST_UDP_ADDR_DURATION, // TODO: Fix time Source::BestCandidate => from + Duration::from_secs(60 * 60), Source::Udp => from + TRUST_UDP_ADDR_DURATION, @@ -58,7 +56,6 @@ pub(super) enum State<'a> { #[derive(Debug, Clone, Copy)] pub enum ClearReason { Inactive, - PongTimeout, MatchesOurLocalAddr, } diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index be1e0a58dbe..32d00d7e888 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -1,18 +1,12 @@ use std::{ - collections::{btree_map::Entry, BTreeSet, HashMap}, - hash::Hash, + collections::{BTreeSet, HashMap}, net::{IpAddr, SocketAddr}, }; -use data_encoding::HEXLOWER; use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl}; -use n0_future::{ - task::{self, AbortOnDropHandle}, - time::{self, Duration, Instant}, -}; +use n0_future::time::{Duration, Instant}; use n0_watcher::Watchable; use serde::{Deserialize, Serialize}; -use tokio::sync::mpsc; use tracing::{debug, event, info, instrument, trace, warn, Level}; use super::{ @@ -25,7 +19,7 @@ use super::{ use crate::endpoint::PathSelection; use crate::{ disco::{self, SendAddr}, - magicsock::{ActorMessage, MagicsockMetrics, NodeIdMappedAddr, HEARTBEAT_INTERVAL}, + magicsock::{MagicsockMetrics, NodeIdMappedAddr, HEARTBEAT_INTERVAL}, }; /// Number of addresses that are not active that we keep around per node. @@ -36,9 +30,6 @@ pub(super) const MAX_INACTIVE_DIRECT_ADDRESSES: usize = 20; /// How long since an endpoint path was last alive before it might be pruned. const LAST_ALIVE_PRUNE_DURATION: Duration = Duration::from_secs(120); -/// How long we wait for a pong reply before assuming it's never coming. -const PING_TIMEOUT_DURATION: Duration = Duration::from_secs(5); - /// The latency at or under which we don't try to upgrade to a better path. const GOOD_ENOUGH_LATENCY: Duration = Duration::from_millis(5); @@ -49,46 +40,12 @@ pub(super) const SESSION_ACTIVE_TIMEOUT: Duration = Duration::from_secs(45); /// How often we try to upgrade to a better patheven if we have some non-relay route that works. const UPGRADE_INTERVAL: Duration = Duration::from_secs(60); -/// How long until we send a stayin alive ping -const STAYIN_ALIVE_MIN_ELAPSED: Duration = Duration::from_secs(2); - #[derive(Debug)] pub(in crate::magicsock) enum PingAction { SendCallMeMaybe { relay_url: RelayUrl, dst_node: NodeId, }, - SendPing(SendPing), -} - -#[derive(Debug)] -pub(in crate::magicsock) struct SendPing { - pub id: usize, - pub dst: SendAddr, - pub dst_node: NodeId, - pub tx_id: stun_rs::TransactionId, - pub purpose: DiscoPingPurpose, -} - -/// Indicating an [`NodeState`] has handled a ping. -#[derive(Debug)] -pub struct PingHandled { - /// What this ping did to the [`NodeState`]. - pub role: PingRole, - /// Whether the sender path should also be pinged. - /// - /// This is the case if an [`NodeState`] does not yet have a direct path, i.e. it has no - /// best_addr. In this case we want to ping right back to open the direct path in this - /// direction as well. - pub needs_ping_back: Option, -} - -#[derive(Debug)] -pub enum PingRole { - Duplicate, - NewPath, - LikelyHeartbeat, - Activate, } /// An iroh node, which we can have connections with. @@ -106,14 +63,11 @@ pub(super) struct NodeState { quic_mapped_addr: NodeIdMappedAddr, /// The global identifier for this endpoint. node_id: NodeId, - /// The last time we pinged all endpoints. - last_full_ping: Option, /// The url of relay node that we can relay over to communicate. /// /// The fallback/bootstrap path, if non-zero (non-zero for well-behaved clients). relay_url: Option<(RelayUrl, PathState)>, udp_paths: NodeUdpPaths, - sent_pings: HashMap, /// Last time this node was used. /// /// A node is marked as in use when sending datagrams to them, or when having received @@ -171,7 +125,6 @@ impl NodeState { id, quic_mapped_addr, node_id: options.node_id, - last_full_ping: None, relay_url: options.relay_url.map(|url| { ( url.clone(), @@ -179,7 +132,6 @@ impl NodeState { ) }), udp_paths: NodeUdpPaths::new(), - sent_pings: HashMap::new(), last_used: options.active.then(Instant::now), last_call_me_maybe: None, conn_type: Watchable::new(ConnectionType::None), @@ -208,32 +160,7 @@ impl NodeState { /// Returns info about this node. pub(super) fn info(&self, now: Instant) -> RemoteInfo { let conn_type = self.conn_type.get(); - let latency = match conn_type { - ConnectionType::Direct(addr) => self - .udp_paths - .paths - .get(&addr.into()) - .and_then(|state| state.latency()), - ConnectionType::Relay(ref url) => self - .relay_url - .as_ref() - .filter(|(relay_url, _)| relay_url == url) - .and_then(|(_, state)| state.latency()), - ConnectionType::Mixed(addr, ref url) => { - let addr_latency = self - .udp_paths - .paths - .get(&addr.into()) - .and_then(|state| state.latency()); - let relay_latency = self - .relay_url - .as_ref() - .filter(|(relay_url, _)| relay_url == url) - .and_then(|(_, state)| state.latency()); - addr_latency.min(relay_latency) - } - ConnectionType::None => None, - }; + let latency = None; let addrs = self .udp_paths @@ -241,7 +168,7 @@ impl NodeState { .iter() .map(|(addr, path_state)| DirectAddrInfo { addr: SocketAddr::from(*addr), - latency: path_state.recent_pong.as_ref().map(|pong| pong.latency), + latency: None, last_control: path_state.last_control_msg(now), last_payload: path_state .last_payload_msg @@ -394,10 +321,6 @@ impl NodeState { #[instrument("want_call_me_maybe", skip_all)] fn want_call_me_maybe(&self, now: &Instant) -> bool { trace!("full ping: wanted?"); - let Some(last_full_ping) = self.last_full_ping else { - debug!("no previous full ping: need full ping"); - return true; - }; match self.udp_paths.best_addr.state(*now) { best_addr::State::Empty => { debug!("best addr not set: need full ping"); @@ -408,7 +331,7 @@ impl NodeState { true } best_addr::State::Valid(addr) => { - if addr.latency > GOOD_ENOUGH_LATENCY && *now - last_full_ping >= UPGRADE_INTERVAL { + if addr.latency > GOOD_ENOUGH_LATENCY { debug!( "full ping interval expired and latency is only {}ms: need full ping", addr.latency.as_millis() @@ -428,139 +351,6 @@ impl NodeState { false } - /// Cleanup the expired ping for the passed in txid. - #[instrument("disco", skip_all, fields(node = %self.node_id.fmt_short()))] - pub(super) fn ping_timeout(&mut self, txid: stun_rs::TransactionId) { - if let Some(sp) = self.sent_pings.remove(&txid) { - debug!(tx = %HEXLOWER.encode(&txid), addr = %sp.to, "pong not received in timeout"); - match sp.to { - SendAddr::Udp(addr) => { - if let Some(path_state) = self.udp_paths.paths.get_mut(&addr.into()) { - path_state.last_ping = None; - let consider_alive = path_state - .last_alive() - .map(|last_alive| last_alive.elapsed() <= PING_TIMEOUT_DURATION) - .unwrap_or(false); - if !consider_alive { - // If there was no sign of life from this path during the time - // which we should have received the pong, clear best addr and - // pong. Both are used to select this path again, but we know - // it's not a usable path now. - path_state.recent_pong = None; - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url().is_some(), - ) - } - } else { - // If we have no state for the best addr it should have been cleared - // anyway. - self.udp_paths.best_addr.clear_if_equals( - addr, - ClearReason::PongTimeout, - self.relay_url.is_some(), - ); - } - } - SendAddr::Relay(ref url) => { - if let Some((home_relay, relay_state)) = self.relay_url.as_mut() { - if home_relay == url { - // lost connectivity via relay - relay_state.last_ping = None; - } - } - } - } - } - } - - #[must_use = "pings must be handled"] - fn start_ping(&self, dst: SendAddr, purpose: DiscoPingPurpose) -> Option { - #[cfg(any(test, feature = "test-utils"))] - if self.path_selection == PathSelection::RelayOnly && !dst.is_relay() { - // don't attempt any hole punching in relay only mode - warn!("in `RelayOnly` mode, ignoring request to start a hole punching attempt."); - return None; - } - #[cfg(wasm_browser)] - if !dst.is_relay() { - return None; // Similar to `RelayOnly` mode, we don't send UDP pings for hole-punching. - } - - let tx_id = stun_rs::TransactionId::default(); - trace!(tx = %HEXLOWER.encode(&tx_id), %dst, ?purpose, - dst = %self.node_id.fmt_short(), "start ping"); - event!( - target: "iroh::_events::ping::sent", - Level::DEBUG, - remote_node = %self.node_id.fmt_short(), - ?dst, - txn = ?tx_id, - ?purpose, - ); - Some(SendPing { - id: self.id, - dst, - dst_node: self.node_id, - tx_id, - purpose, - }) - } - - /// Record the fact that a ping has been sent out. - pub(super) fn ping_sent( - &mut self, - to: SendAddr, - tx_id: stun_rs::TransactionId, - purpose: DiscoPingPurpose, - sender: mpsc::Sender, - ) { - trace!(%to, tx = %HEXLOWER.encode(&tx_id), ?purpose, "record ping sent"); - - let now = Instant::now(); - let mut path_found = false; - match to { - SendAddr::Udp(addr) => { - if let Some(st) = self.udp_paths.paths.get_mut(&addr.into()) { - st.last_ping.replace(now); - path_found = true - } - } - SendAddr::Relay(ref url) => { - if let Some((home_relay, relay_state)) = self.relay_url.as_mut() { - if home_relay == url { - relay_state.last_ping.replace(now); - path_found = true - } - } - } - } - if !path_found { - // Shouldn't happen. But don't ping an endpoint that's not active for us. - warn!(%to, ?purpose, "unexpected attempt to ping no longer live path"); - return; - } - - let id = self.id; - let _expiry_task = AbortOnDropHandle::new(task::spawn(async move { - time::sleep(PING_TIMEOUT_DURATION).await; - sender - .send(ActorMessage::EndpointPingExpired(id, tx_id)) - .await - .ok(); - })); - self.sent_pings.insert( - tx_id, - SentPing { - to, - at: now, - purpose, - _expiry_task, - }, - ); - } - /// Send a DISCO call-me-maybe message to the peer. /// /// This takes care of sending the needed pings beforehand. This ensures that we open @@ -587,11 +377,7 @@ impl NodeState { } } } - // We send pings regardless of whether we have a RelayUrl. If we were given any - // direct address paths to contact but no RelayUrl, we still need to send a DISCO - // ping to the direct address paths so that the other node will learn about us and - // accepts the connection. - let mut msgs = self.send_pings(now); + let mut msgs = Vec::new(); if let Some(url) = self.relay_url() { debug!(%url, "queue call-me-maybe"); @@ -607,59 +393,6 @@ impl NodeState { msgs } - /// Send DISCO Pings to all the paths of this node. - /// - /// Any paths to the node which have not been recently pinged will be sent a disco - /// ping. - /// - /// The caller is responsible for sending the messages. - #[must_use = "actions must be handled"] - fn send_pings(&mut self, now: Instant) -> Vec { - // We allocate +1 in case the caller wants to add a call-me-maybe message. - let mut ping_msgs = Vec::with_capacity(self.udp_paths.paths.len() + 1); - - if let Some((url, state)) = self.relay_url.as_ref() { - if state.needs_ping(&now) { - debug!(%url, "relay path needs ping"); - if let Some(msg) = - self.start_ping(SendAddr::Relay(url.clone()), DiscoPingPurpose::Discovery) - { - ping_msgs.push(PingAction::SendPing(msg)) - } - } - } - - #[cfg(any(test, feature = "test-utils"))] - if self.path_selection == PathSelection::RelayOnly { - warn!("in `RelayOnly` mode, ignoring request to respond to a hole punching attempt."); - return ping_msgs; - } - - self.prune_direct_addresses(); - let mut ping_dsts = String::from("["); - self.udp_paths - .paths - .iter() - .filter_map(|(ipp, state)| state.needs_ping(&now).then_some(*ipp)) - .filter_map(|ipp| { - self.start_ping(SendAddr::Udp(ipp.into()), DiscoPingPurpose::Discovery) - }) - .for_each(|msg| { - use std::fmt::Write; - write!(&mut ping_dsts, " {} ", msg.dst).ok(); - ping_msgs.push(PingAction::SendPing(msg)); - }); - ping_dsts.push(']'); - debug!( - %ping_dsts, - dst = %self.node_id.fmt_short(), - paths = %summarize_node_paths(&self.udp_paths.paths), - "sending pings to node", - ); - self.last_full_ping.replace(now); - ping_msgs - } - pub(super) fn update_from_node_addr( &mut self, new_relay_url: Option<&RelayUrl>, @@ -709,114 +442,6 @@ impl NodeState { debug!(new = ?new_addrs , %paths, "added new direct paths for endpoint"); } - /// Handle a received Disco Ping. - /// - /// - Ensures the paths the ping was received on is a known path for this endpoint. - /// - /// - If there is no best_addr for this endpoint yet, sends a ping itself to try and - /// establish one. - /// - /// This is called once we've already verified that we got a valid discovery message - /// from `self` via ep. - pub(super) fn handle_ping( - &mut self, - path: SendAddr, - tx_id: stun_rs::TransactionId, - ) -> PingHandled { - let now = Instant::now(); - - let role = match path { - SendAddr::Udp(addr) => match self.udp_paths.paths.entry(addr.into()) { - Entry::Occupied(mut occupied) => occupied.get_mut().handle_ping(tx_id, now), - Entry::Vacant(vacant) => { - info!(%addr, "new direct addr for node"); - vacant.insert(PathState::with_ping( - self.node_id, - path.clone(), - tx_id, - Source::Udp, - now, - )); - PingRole::NewPath - } - }, - SendAddr::Relay(ref url) => { - match self.relay_url.as_mut() { - Some((home_url, _state)) if home_url != url => { - // either the node changed relays or we didn't have a relay address for the - // node. In both cases, trust the new confirmed url - info!(%url, "new relay addr for node"); - self.relay_url = Some(( - url.clone(), - PathState::with_ping( - self.node_id, - path.clone(), - tx_id, - Source::Relay, - now, - ), - )); - PingRole::NewPath - } - Some((_home_url, state)) => state.handle_ping(tx_id, now), - None => { - info!(%url, "new relay addr for node"); - self.relay_url = Some(( - url.clone(), - PathState::with_ping( - self.node_id, - path.clone(), - tx_id, - Source::Relay, - now, - ), - )); - PingRole::NewPath - } - } - } - }; - event!( - target: "iroh::_events::ping::recv", - Level::DEBUG, - remote_node = %self.node_id.fmt_short(), - src = ?path, - txn = ?tx_id, - ?role, - ); - - if matches!(path, SendAddr::Udp(_)) && matches!(role, PingRole::NewPath) { - self.prune_direct_addresses(); - } - - // if the endpoint does not yet have a best_addrr - let needs_ping_back = if matches!(path, SendAddr::Udp(_)) - && matches!( - self.udp_paths.best_addr.state(now), - best_addr::State::Empty | best_addr::State::Outdated(_) - ) { - // We also need to send a ping to make this path available to us as well. This - // is always sent together with a pong. So in the worst case the pong gets lost - // and this ping does not. In that case we ping-pong until both sides have - // received at least one pong. Once both sides have received one pong they both - // have a best_addr and this ping will stop being sent. - self.start_ping(path, DiscoPingPurpose::PingBack) - } else { - None - }; - - debug!( - ?role, - needs_ping_back = ?needs_ping_back.is_some(), - paths = %summarize_node_paths(&self.udp_paths.paths), - "endpoint handled ping", - ); - PingHandled { - role, - needs_ping_back, - } - } - /// Prune inactive paths. /// /// This trims the list of inactive paths for an endpoint. At most @@ -869,110 +494,6 @@ impl NodeState { } } - /// Handles a Pong message (a reply to an earlier ping). - /// - /// It reports the address and key that should be inserted for the endpoint if any. - #[instrument(skip(self))] - pub(super) fn handle_pong( - &mut self, - m: &disco::Pong, - src: SendAddr, - ) -> Option<(SocketAddr, PublicKey)> { - event!( - target: "iroh::_events::pong::recv", - Level::DEBUG, - remote_node = self.node_id.fmt_short(), - ?src, - txn = ?m.tx_id, - ); - let is_relay = src.is_relay(); - match self.sent_pings.remove(&m.tx_id) { - None => { - // This is not a pong for a ping we sent. In reality however we probably - // did send this ping but it has timed-out by the time we receive this pong - // so we removed the state already. - debug!(tx = %HEXLOWER.encode(&m.tx_id), "received unknown pong (did it timeout?)"); - None - } - Some(sp) => { - let mut node_map_insert = None; - - let now = Instant::now(); - let latency = now - sp.at; - - debug!( - tx = %HEXLOWER.encode(&m.tx_id), - src = %src, - reported_ping_src = %m.ping_observed_addr, - ping_dst = %sp.to, - is_relay = %src.is_relay(), - latency = %latency.as_millis(), - "received pong", - ); - - match src { - SendAddr::Udp(addr) => { - match self.udp_paths.paths.get_mut(&addr.into()) { - None => { - warn!("ignoring pong: no state for src addr"); - // This is no longer an endpoint we care about. - return node_map_insert; - } - Some(st) => { - node_map_insert = Some((addr, self.node_id)); - st.add_pong_reply(PongReply { - latency, - pong_at: now, - from: src, - pong_src: m.ping_observed_addr.clone(), - }); - } - } - debug!( - paths = %summarize_node_paths(&self.udp_paths.paths), - "handled pong", - ); - } - SendAddr::Relay(ref url) => match self.relay_url.as_mut() { - Some((home_url, state)) if home_url == url => { - state.add_pong_reply(PongReply { - latency, - pong_at: now, - from: src, - pong_src: m.ping_observed_addr.clone(), - }); - } - other => { - // if we are here then we sent this ping, but the url changed - // waiting for the response. It was either set to None or changed to - // another relay. This should either never happen or be extremely - // unlikely. Log and ignore for now - warn!( - stored=?other, - received=?url, - "ignoring pong via relay for different relay from last one", - ); - } - }, - } - - // Promote this pong response to our current best address if it's lower latency. - // TODO(bradfitz): decide how latency vs. preference order affects decision - if let SendAddr::Udp(to) = sp.to { - debug_assert!(!is_relay, "mismatching relay & udp"); - self.udp_paths.best_addr.insert_if_better_or_reconfirm( - to, - latency, - best_addr::Source::ReceivedPong, - now, - ); - } - - node_map_insert - } - } - } - /// Handles a DISCO CallMeMaybe discovery message. /// /// The contract for use of this message is that the node has already pinged to us via @@ -983,7 +504,7 @@ impl NodeState { /// had any [`IpPort`]s to send pings to and our pings might end up blocked. But at /// least open the firewalls on our side, giving the other side another change of making /// it through when it pings in response. - pub(super) fn handle_call_me_maybe(&mut self, m: disco::CallMeMaybe) -> Vec { + pub(super) fn handle_call_me_maybe(&mut self, m: disco::CallMeMaybe) { let now = Instant::now(); let mut call_me_maybe_ipps = BTreeSet::new(); @@ -1012,20 +533,6 @@ impl NodeState { .replace(now); } - // Zero out all the last_ping times to force send_pings to send new ones, even if - // it's been less than 5 seconds ago. Also clear pongs for direct addresses not - // included in the updated set. - for (ipp, st) in self.udp_paths.paths.iter_mut() { - st.last_ping = None; - if !call_me_maybe_ipps.contains(ipp) { - // TODO: This seems like a weird way to signal that the endpoint no longer - // thinks it has this IpPort as an available path. - if st.recent_pong.is_some() { - debug!(path=?ipp ,"clearing recent pong"); - st.recent_pong = None; - } - } - } // Clear trust on our best_addr if it is not included in the updated set. Also // clear the last call-me-maybe send time so we will send one again. if let Some(addr) = self.udp_paths.best_addr.addr() { @@ -1041,7 +548,6 @@ impl NodeState { paths = %summarize_node_paths(&self.udp_paths.paths), "updated endpoint paths from call-me-maybe", ); - self.send_pings(now) } /// Marks this node as having received a UDP payload message. @@ -1082,21 +588,6 @@ impl NodeState { self.last_used = Some(now); } - pub(super) fn last_ping(&self, addr: &SendAddr) -> Option { - match addr { - SendAddr::Udp(addr) => self - .udp_paths - .paths - .get(&(*addr).into()) - .and_then(|ep| ep.last_ping), - SendAddr::Relay(url) => self - .relay_url - .as_ref() - .filter(|(home_url, _state)| home_url == url) - .and_then(|(_home_url, state)| state.last_ping), - } - } - /// Checks if this `Endpoint` is currently actively being used. pub(super) fn is_active(&self, now: &Instant) -> bool { match self.last_used { @@ -1122,29 +613,6 @@ impl NodeState { return self.send_call_me_maybe(now, SendCallMeMaybe::Always); } - // Send heartbeat ping to keep the current addr going as long as we need it. - if let Some(udp_addr) = self.udp_paths.best_addr.addr() { - let elapsed = self.last_ping(&SendAddr::Udp(udp_addr)).map(|l| now - l); - // Send a ping if the last ping is older than 2 seconds. - let needs_ping = match elapsed { - Some(e) => e >= STAYIN_ALIVE_MIN_ELAPSED, - None => false, - }; - - if needs_ping { - debug!( - dst = %udp_addr, - since_last_ping=?elapsed, - "send stayin alive ping", - ); - if let Some(msg) = - self.start_ping(SendAddr::Udp(udp_addr), DiscoPingPurpose::StayinAlive) - { - return vec![PingAction::SendPing(msg)]; - } - } - } - Vec::new() } @@ -1179,6 +647,17 @@ impl NodeState { (udp_addr, relay_url, ping_msgs) } + pub(crate) fn get_current_addr(&self) -> NodeAddr { + // TODO: more selective? + let mut node_addr = + NodeAddr::new(self.node_id).with_direct_addresses(self.udp_paths.addrs()); + if let Some((url, _)) = &self.relay_url { + node_addr = node_addr.with_relay_url(url.clone()); + } + + node_addr + } + /// Get the direct addresses for this endpoint. pub(super) fn direct_addresses(&self) -> impl Iterator + '_ { self.udp_paths.paths.keys().copied() @@ -1220,26 +699,6 @@ enum SendCallMeMaybe { IfNoRecent, } -#[derive(Debug, Clone, PartialEq, Eq, Hash)] -pub(super) struct PongReply { - pub(super) latency: Duration, - /// When we received the pong. - pub(super) pong_at: Instant, - /// The pong's src (usually same as endpoint map key). - pub(super) from: SendAddr, - /// What they reported they heard. - pub(super) pong_src: SendAddr, -} - -#[derive(Debug)] -pub(super) struct SentPing { - pub(super) to: SendAddr, - pub(super) at: Instant, - #[allow(dead_code)] - pub(super) purpose: DiscoPingPurpose, - pub(super) _expiry_task: AbortOnDropHandle<()>, -} - /// The reason why a discovery ping message was sent. #[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum DiscoPingPurpose { @@ -1336,7 +795,7 @@ impl From<(RelayUrl, PathState)> for RelayUrlInfo { RelayUrlInfo { relay_url: value.0, last_alive: value.1.last_alive().map(|i| i.elapsed()), - latency: value.1.latency(), + latency: None, } } } @@ -1449,262 +908,262 @@ mod tests { use super::*; use crate::magicsock::node_map::{NodeMap, NodeMapInner}; - #[test] - fn test_remote_infos() { - let now = Instant::now(); - let elapsed = Duration::from_secs(3); - let later = now + elapsed; - let send_addr: RelayUrl = "https://my-relay.com".parse().unwrap(); - let pong_src = SendAddr::Udp("0.0.0.0:1".parse().unwrap()); - let latency = Duration::from_millis(50); - - let relay_and_state = |node_id: NodeId, url: RelayUrl| { - let relay_state = PathState::with_pong_reply( - node_id, - PongReply { - latency, - pong_at: now, - from: SendAddr::Relay(send_addr.clone()), - pong_src: pong_src.clone(), - }, - ); - Some((url, relay_state)) - }; - - // endpoint with a `best_addr` that has a latency but no relay - let (a_endpoint, a_socket_addr) = { - let key = SecretKey::generate(rand::thread_rng()); - let node_id = key.public(); - let ip_port = IpPort { - ip: Ipv4Addr::UNSPECIFIED.into(), - port: 10, - }; - let endpoint_state = BTreeMap::from([( - ip_port, - PathState::with_pong_reply( - node_id, - PongReply { - latency, - pong_at: now, - from: SendAddr::Udp(ip_port.into()), - pong_src: pong_src.clone(), - }, - ), - )]); - ( - NodeState { - id: 0, - quic_mapped_addr: NodeIdMappedAddr::generate(), - node_id: key.public(), - last_full_ping: None, - relay_url: None, - udp_paths: NodeUdpPaths::from_parts( - endpoint_state, - BestAddr::from_parts( - ip_port.into(), - latency, - now, - now + Duration::from_secs(100), - ), - ), - sent_pings: HashMap::new(), - last_used: Some(now), - last_call_me_maybe: None, - conn_type: Watchable::new(ConnectionType::Direct(ip_port.into())), - has_been_direct: true, - #[cfg(any(test, feature = "test-utils"))] - path_selection: PathSelection::default(), - }, - ip_port.into(), - ) - }; - // endpoint w/ no best addr but a relay w/ latency - let b_endpoint = { - // let socket_addr = "0.0.0.0:9".parse().unwrap(); - let key = SecretKey::generate(rand::thread_rng()); - NodeState { - id: 1, - quic_mapped_addr: NodeIdMappedAddr::generate(), - node_id: key.public(), - last_full_ping: None, - relay_url: relay_and_state(key.public(), send_addr.clone()), - udp_paths: NodeUdpPaths::new(), - sent_pings: HashMap::new(), - last_used: Some(now), - last_call_me_maybe: None, - conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, - #[cfg(any(test, feature = "test-utils"))] - path_selection: PathSelection::default(), - } - }; - - // endpoint w/ no best addr but a relay w/ no latency - let c_endpoint = { - // let socket_addr = "0.0.0.0:8".parse().unwrap(); - let key = SecretKey::generate(rand::thread_rng()); - NodeState { - id: 2, - quic_mapped_addr: NodeIdMappedAddr::generate(), - node_id: key.public(), - last_full_ping: None, - relay_url: Some(( - send_addr.clone(), - PathState::new( - key.public(), - SendAddr::from(send_addr.clone()), - Source::App, - now, - ), - )), - udp_paths: NodeUdpPaths::new(), - sent_pings: HashMap::new(), - last_used: Some(now), - last_call_me_maybe: None, - conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), - has_been_direct: false, - #[cfg(any(test, feature = "test-utils"))] - path_selection: PathSelection::default(), - } - }; - - // endpoint w/ expired best addr and relay w/ latency - let (d_endpoint, d_socket_addr) = { - let socket_addr: SocketAddr = "0.0.0.0:7".parse().unwrap(); - let expired = now.checked_sub(Duration::from_secs(100)).unwrap(); - let key = SecretKey::generate(rand::thread_rng()); - let node_id = key.public(); - let endpoint_state = BTreeMap::from([( - IpPort::from(socket_addr), - PathState::with_pong_reply( - node_id, - PongReply { - latency, - pong_at: now, - from: SendAddr::Udp(socket_addr), - pong_src: pong_src.clone(), - }, - ), - )]); - ( - NodeState { - id: 3, - quic_mapped_addr: NodeIdMappedAddr::generate(), - node_id: key.public(), - last_full_ping: None, - relay_url: relay_and_state(key.public(), send_addr.clone()), - udp_paths: NodeUdpPaths::from_parts( - endpoint_state, - BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), - ), - sent_pings: HashMap::new(), - last_used: Some(now), - last_call_me_maybe: None, - conn_type: Watchable::new(ConnectionType::Mixed( - socket_addr, - send_addr.clone(), - )), - has_been_direct: false, - #[cfg(any(test, feature = "test-utils"))] - path_selection: PathSelection::default(), - }, - socket_addr, - ) - }; - - let mut expect = Vec::from([ - RemoteInfo { - node_id: a_endpoint.node_id, - relay_url: None, - addrs: Vec::from([DirectAddrInfo { - addr: a_socket_addr, - latency: Some(latency), - last_control: Some((elapsed, ControlMsg::Pong)), - last_payload: None, - last_alive: Some(elapsed), - sources: HashMap::new(), - }]), - conn_type: ConnectionType::Direct(a_socket_addr), - latency: Some(latency), - last_used: Some(elapsed), - }, - RemoteInfo { - node_id: b_endpoint.node_id, - relay_url: Some(RelayUrlInfo { - relay_url: b_endpoint.relay_url.as_ref().unwrap().0.clone(), - last_alive: None, - latency: Some(latency), - }), - addrs: Vec::new(), - conn_type: ConnectionType::Relay(send_addr.clone()), - latency: Some(latency), - last_used: Some(elapsed), - }, - RemoteInfo { - node_id: c_endpoint.node_id, - relay_url: Some(RelayUrlInfo { - relay_url: c_endpoint.relay_url.as_ref().unwrap().0.clone(), - last_alive: None, - latency: None, - }), - addrs: Vec::new(), - conn_type: ConnectionType::Relay(send_addr.clone()), - latency: None, - last_used: Some(elapsed), - }, - RemoteInfo { - node_id: d_endpoint.node_id, - relay_url: Some(RelayUrlInfo { - relay_url: d_endpoint.relay_url.as_ref().unwrap().0.clone(), - last_alive: None, - latency: Some(latency), - }), - addrs: Vec::from([DirectAddrInfo { - addr: d_socket_addr, - latency: Some(latency), - last_control: Some((elapsed, ControlMsg::Pong)), - last_payload: None, - last_alive: Some(elapsed), - sources: HashMap::new(), - }]), - conn_type: ConnectionType::Mixed(d_socket_addr, send_addr.clone()), - latency: Some(Duration::from_millis(50)), - last_used: Some(elapsed), - }, - ]); - - let node_map = NodeMap::from_inner(NodeMapInner { - by_node_key: HashMap::from([ - (a_endpoint.node_id, a_endpoint.id), - (b_endpoint.node_id, b_endpoint.id), - (c_endpoint.node_id, c_endpoint.id), - (d_endpoint.node_id, d_endpoint.id), - ]), - by_ip_port: HashMap::from([ - (a_socket_addr.into(), a_endpoint.id), - (d_socket_addr.into(), d_endpoint.id), - ]), - by_quic_mapped_addr: HashMap::from([ - (a_endpoint.quic_mapped_addr, a_endpoint.id), - (b_endpoint.quic_mapped_addr, b_endpoint.id), - (c_endpoint.quic_mapped_addr, c_endpoint.id), - (d_endpoint.quic_mapped_addr, d_endpoint.id), - ]), - by_id: HashMap::from([ - (a_endpoint.id, a_endpoint), - (b_endpoint.id, b_endpoint), - (c_endpoint.id, c_endpoint), - (d_endpoint.id, d_endpoint), - ]), - next_id: 5, - path_selection: PathSelection::default(), - }); - let mut got = node_map.list_remote_infos(later); - got.sort_by_key(|p| p.node_id); - expect.sort_by_key(|p| p.node_id); - remove_non_deterministic_fields(&mut got); - assert_eq!(expect, got); - } + // #[test] + // fn test_remote_infos() { + // let now = Instant::now(); + // let elapsed = Duration::from_secs(3); + // let later = now + elapsed; + // let send_addr: RelayUrl = "https://my-relay.com".parse().unwrap(); + // let pong_src = SendAddr::Udp("0.0.0.0:1".parse().unwrap()); + // let latency = Duration::from_millis(50); + + // let relay_and_state = |node_id: NodeId, url: RelayUrl| { + // let relay_state = PathState::with_pong_reply( + // node_id, + // PongReply { + // latency, + // pong_at: now, + // from: SendAddr::Relay(send_addr.clone()), + // pong_src: pong_src.clone(), + // }, + // ); + // Some((url, relay_state)) + // }; + + // // endpoint with a `best_addr` that has a latency but no relay + // let (a_endpoint, a_socket_addr) = { + // let key = SecretKey::generate(rand::thread_rng()); + // let node_id = key.public(); + // let ip_port = IpPort { + // ip: Ipv4Addr::UNSPECIFIED.into(), + // port: 10, + // }; + // let endpoint_state = BTreeMap::from([( + // ip_port, + // PathState::with_pong_reply( + // node_id, + // PongReply { + // latency, + // pong_at: now, + // from: SendAddr::Udp(ip_port.into()), + // pong_src: pong_src.clone(), + // }, + // ), + // )]); + // ( + // NodeState { + // id: 0, + // quic_mapped_addr: NodeIdMappedAddr::generate(), + // node_id: key.public(), + // last_full_ping: None, + // relay_url: None, + // udp_paths: NodeUdpPaths::from_parts( + // endpoint_state, + // BestAddr::from_parts( + // ip_port.into(), + // latency, + // now, + // now + Duration::from_secs(100), + // ), + // ), + // sent_pings: HashMap::new(), + // last_used: Some(now), + // last_call_me_maybe: None, + // conn_type: Watchable::new(ConnectionType::Direct(ip_port.into())), + // has_been_direct: true, + // #[cfg(any(test, feature = "test-utils"))] + // path_selection: PathSelection::default(), + // }, + // ip_port.into(), + // ) + // }; + // // endpoint w/ no best addr but a relay w/ latency + // let b_endpoint = { + // // let socket_addr = "0.0.0.0:9".parse().unwrap(); + // let key = SecretKey::generate(rand::thread_rng()); + // NodeState { + // id: 1, + // quic_mapped_addr: NodeIdMappedAddr::generate(), + // node_id: key.public(), + // last_full_ping: None, + // relay_url: relay_and_state(key.public(), send_addr.clone()), + // udp_paths: NodeUdpPaths::new(), + // sent_pings: HashMap::new(), + // last_used: Some(now), + // last_call_me_maybe: None, + // conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), + // has_been_direct: false, + // #[cfg(any(test, feature = "test-utils"))] + // path_selection: PathSelection::default(), + // } + // }; + + // // endpoint w/ no best addr but a relay w/ no latency + // let c_endpoint = { + // // let socket_addr = "0.0.0.0:8".parse().unwrap(); + // let key = SecretKey::generate(rand::thread_rng()); + // NodeState { + // id: 2, + // quic_mapped_addr: NodeIdMappedAddr::generate(), + // node_id: key.public(), + // last_full_ping: None, + // relay_url: Some(( + // send_addr.clone(), + // PathState::new( + // key.public(), + // SendAddr::from(send_addr.clone()), + // Source::App, + // now, + // ), + // )), + // udp_paths: NodeUdpPaths::new(), + // sent_pings: HashMap::new(), + // last_used: Some(now), + // last_call_me_maybe: None, + // conn_type: Watchable::new(ConnectionType::Relay(send_addr.clone())), + // has_been_direct: false, + // #[cfg(any(test, feature = "test-utils"))] + // path_selection: PathSelection::default(), + // } + // }; + + // // endpoint w/ expired best addr and relay w/ latency + // let (d_endpoint, d_socket_addr) = { + // let socket_addr: SocketAddr = "0.0.0.0:7".parse().unwrap(); + // let expired = now.checked_sub(Duration::from_secs(100)).unwrap(); + // let key = SecretKey::generate(rand::thread_rng()); + // let node_id = key.public(); + // let endpoint_state = BTreeMap::from([( + // IpPort::from(socket_addr), + // PathState::with_pong_reply( + // node_id, + // PongReply { + // latency, + // pong_at: now, + // from: SendAddr::Udp(socket_addr), + // pong_src: pong_src.clone(), + // }, + // ), + // )]); + // ( + // NodeState { + // id: 3, + // quic_mapped_addr: NodeIdMappedAddr::generate(), + // node_id: key.public(), + // last_full_ping: None, + // relay_url: relay_and_state(key.public(), send_addr.clone()), + // udp_paths: NodeUdpPaths::from_parts( + // endpoint_state, + // BestAddr::from_parts(socket_addr, Duration::from_millis(80), now, expired), + // ), + // sent_pings: HashMap::new(), + // last_used: Some(now), + // last_call_me_maybe: None, + // conn_type: Watchable::new(ConnectionType::Mixed( + // socket_addr, + // send_addr.clone(), + // )), + // has_been_direct: false, + // #[cfg(any(test, feature = "test-utils"))] + // path_selection: PathSelection::default(), + // }, + // socket_addr, + // ) + // }; + + // let mut expect = Vec::from([ + // RemoteInfo { + // node_id: a_endpoint.node_id, + // relay_url: None, + // addrs: Vec::from([DirectAddrInfo { + // addr: a_socket_addr, + // latency: Some(latency), + // last_control: Some((elapsed, ControlMsg::Pong)), + // last_payload: None, + // last_alive: Some(elapsed), + // sources: HashMap::new(), + // }]), + // conn_type: ConnectionType::Direct(a_socket_addr), + // latency: Some(latency), + // last_used: Some(elapsed), + // }, + // RemoteInfo { + // node_id: b_endpoint.node_id, + // relay_url: Some(RelayUrlInfo { + // relay_url: b_endpoint.relay_url.as_ref().unwrap().0.clone(), + // last_alive: None, + // latency: Some(latency), + // }), + // addrs: Vec::new(), + // conn_type: ConnectionType::Relay(send_addr.clone()), + // latency: Some(latency), + // last_used: Some(elapsed), + // }, + // RemoteInfo { + // node_id: c_endpoint.node_id, + // relay_url: Some(RelayUrlInfo { + // relay_url: c_endpoint.relay_url.as_ref().unwrap().0.clone(), + // last_alive: None, + // latency: None, + // }), + // addrs: Vec::new(), + // conn_type: ConnectionType::Relay(send_addr.clone()), + // latency: None, + // last_used: Some(elapsed), + // }, + // RemoteInfo { + // node_id: d_endpoint.node_id, + // relay_url: Some(RelayUrlInfo { + // relay_url: d_endpoint.relay_url.as_ref().unwrap().0.clone(), + // last_alive: None, + // latency: Some(latency), + // }), + // addrs: Vec::from([DirectAddrInfo { + // addr: d_socket_addr, + // latency: Some(latency), + // last_control: Some((elapsed, ControlMsg::Pong)), + // last_payload: None, + // last_alive: Some(elapsed), + // sources: HashMap::new(), + // }]), + // conn_type: ConnectionType::Mixed(d_socket_addr, send_addr.clone()), + // latency: Some(Duration::from_millis(50)), + // last_used: Some(elapsed), + // }, + // ]); + + // let node_map = NodeMap::from_inner(NodeMapInner { + // by_node_key: HashMap::from([ + // (a_endpoint.node_id, a_endpoint.id), + // (b_endpoint.node_id, b_endpoint.id), + // (c_endpoint.node_id, c_endpoint.id), + // (d_endpoint.node_id, d_endpoint.id), + // ]), + // by_ip_port: HashMap::from([ + // (a_socket_addr.into(), a_endpoint.id), + // (d_socket_addr.into(), d_endpoint.id), + // ]), + // by_quic_mapped_addr: HashMap::from([ + // (a_endpoint.quic_mapped_addr, a_endpoint.id), + // (b_endpoint.quic_mapped_addr, b_endpoint.id), + // (c_endpoint.quic_mapped_addr, c_endpoint.id), + // (d_endpoint.quic_mapped_addr, d_endpoint.id), + // ]), + // by_id: HashMap::from([ + // (a_endpoint.id, a_endpoint), + // (b_endpoint.id, b_endpoint), + // (c_endpoint.id, c_endpoint), + // (d_endpoint.id, d_endpoint), + // ]), + // next_id: 5, + // path_selection: PathSelection::default(), + // }); + // let mut got = node_map.list_remote_infos(later); + // got.sort_by_key(|p| p.node_id); + // expect.sort_by_key(|p| p.node_id); + // remove_non_deterministic_fields(&mut got); + // assert_eq!(expect, got); + // } fn remove_non_deterministic_fields(infos: &mut [RemoteInfo]) { for info in infos.iter_mut() { @@ -1737,10 +1196,6 @@ mod tests { .collect(); let call_me_maybe = disco::CallMeMaybe { my_numbers }; - let ping_messages = ep.handle_call_me_maybe(call_me_maybe); - - // We have no relay server and no previous direct addresses, so we should get the same - // number of pings as direct addresses in the call-me-maybe. - assert_eq!(ping_messages.len(), my_numbers_count as usize); + ep.handle_call_me_maybe(call_me_maybe); } } diff --git a/iroh/src/magicsock/node_map/path_state.rs b/iroh/src/magicsock/node_map/path_state.rs index 2d6855cab30..377019c4e26 100644 --- a/iroh/src/magicsock/node_map/path_state.rs +++ b/iroh/src/magicsock/node_map/path_state.rs @@ -7,19 +7,12 @@ use std::{ use iroh_base::NodeId; use n0_future::time::{Duration, Instant}; -use tracing::{debug, event, Level}; use super::{ - node_state::{ControlMsg, PongReply, SESSION_ACTIVE_TIMEOUT}, - IpPort, PingRole, Source, + node_state::{ControlMsg, SESSION_ACTIVE_TIMEOUT}, + IpPort, Source, }; -use crate::{disco::SendAddr, magicsock::HEARTBEAT_INTERVAL}; - -/// The minimum time between pings to an endpoint. -/// -/// Except in the case of CallMeMaybe frames resetting the counter, as the first pings -/// likely didn't through the firewall. -const DISCO_PING_INTERVAL: Duration = Duration::from_secs(5); +use crate::disco::SendAddr; /// State about a particular path to another [`NodeState`]. /// @@ -32,22 +25,10 @@ pub(super) struct PathState { node_id: NodeId, /// The path this applies for. path: SendAddr, - /// The last (outgoing) ping time. - pub(super) last_ping: Option, - - /// If non-zero, means that this was an endpoint that we learned about at runtime (from an - /// incoming ping). If so, we keep the time updated and use it to discard old candidates. - // NOTE: tx_id Originally added in tailscale due to . - last_got_ping: Option<(Instant, stun_rs::TransactionId)>, /// The time this endpoint was last advertised via a call-me-maybe DISCO message. pub(super) call_me_maybe_time: Option, - /// The most recent [`PongReply`]. - /// - /// Previous replies are cleared when they are no longer relevant to determine whether - /// this path can still be used to reach the remote node. - pub(super) recent_pong: Option, /// When the last payload data was **received** via this path. /// /// This excludes DISCO messages. @@ -67,10 +48,7 @@ impl PathState { Self { node_id, path, - last_ping: None, - last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, last_payload_msg: None, sources, } @@ -94,56 +72,12 @@ impl PathState { PathState { node_id, path, - last_ping: None, - last_got_ping: None, call_me_maybe_time: None, - recent_pong: None, last_payload_msg: Some(now), sources, } } - pub(super) fn with_ping( - node_id: NodeId, - path: SendAddr, - tx_id: stun_rs::TransactionId, - source: Source, - now: Instant, - ) -> Self { - let mut new = PathState::new(node_id, path, source, now); - new.handle_ping(tx_id, now); - new - } - - pub(super) fn add_pong_reply(&mut self, r: PongReply) { - if let SendAddr::Udp(ref path) = self.path { - if self.recent_pong.is_none() { - event!( - target: "iroh::_events::holepunched", - Level::DEBUG, - remote_node = %self.node_id.fmt_short(), - path = ?path, - direction = "outgoing", - ); - } - } - self.recent_pong = Some(r); - } - - #[cfg(test)] - pub(super) fn with_pong_reply(node_id: NodeId, r: PongReply) -> Self { - PathState { - node_id, - path: r.from.clone(), - last_ping: None, - last_got_ping: None, - call_me_maybe_time: None, - recent_pong: Some(r), - last_payload_msg: None, - sources: HashMap::new(), - } - } - /// Check whether this path is considered active. /// /// Active means the path has received payload messages within the last @@ -158,11 +92,6 @@ impl PathState { .unwrap_or(false) } - /// Returns the instant the last incoming ping was received. - pub(super) fn last_incoming_ping(&self) -> Option<&Instant> { - self.last_got_ping.as_ref().map(|(time, _tx_id)| time) - } - /// Reports the last instant this path was considered alive. /// /// Alive means the path is considered in use by the remote endpoint. Either because we @@ -175,13 +104,10 @@ impl PathState { /// - When the last payload transmission occurred. /// - when the last ping from them was received. pub(super) fn last_alive(&self) -> Option { - self.recent_pong + self.last_payload_msg .as_ref() - .map(|pong| &pong.pong_at) .into_iter() - .chain(self.last_payload_msg.as_ref()) .chain(self.call_me_maybe_time.as_ref()) - .chain(self.last_incoming_ping()) .max() .copied() } @@ -189,97 +115,29 @@ impl PathState { /// The last control or DISCO message **about** this path. /// /// This is the most recent instant among: - /// - when last pong was received. /// - when this path was last advertised in a received CallMeMaybe message. /// - when the last ping from them was received. /// /// Returns the time elapsed since the last control message, and the type of control message. pub(super) fn last_control_msg(&self, now: Instant) -> Option<(Duration, ControlMsg)> { // get every control message and assign it its kind - let last_pong = self - .recent_pong - .as_ref() - .map(|pong| (pong.pong_at, ControlMsg::Pong)); let last_call_me_maybe = self .call_me_maybe_time .as_ref() .map(|call_me| (*call_me, ControlMsg::CallMeMaybe)); - let last_ping = self - .last_incoming_ping() - .map(|ping| (*ping, ControlMsg::Ping)); - last_pong + last_call_me_maybe .into_iter() - .chain(last_call_me_maybe) - .chain(last_ping) .max_by_key(|(instant, _kind)| *instant) .map(|(instant, kind)| (now.duration_since(instant), kind)) } - /// Returns the latency from the most recent pong, if available. - pub(super) fn latency(&self) -> Option { - self.recent_pong.as_ref().map(|p| p.latency) - } - - pub(super) fn needs_ping(&self, now: &Instant) -> bool { - match self.last_ping { - None => true, - Some(last_ping) => { - let elapsed = now.duration_since(last_ping); - - // TODO: remove! - // This logs "ping is too new" for each send whenever the endpoint does *not* need - // a ping. Pretty sure this is not a useful log, but maybe there was a reason? - // if !needs_ping { - // debug!("ping is too new: {}ms", elapsed.as_millis()); - // } - elapsed > DISCO_PING_INTERVAL - } - } - } - - pub(super) fn handle_ping(&mut self, tx_id: stun_rs::TransactionId, now: Instant) -> PingRole { - if Some(&tx_id) == self.last_got_ping.as_ref().map(|(_t, tx_id)| tx_id) { - PingRole::Duplicate - } else { - let prev = self.last_got_ping.replace((now, tx_id)); - let heartbeat_deadline = HEARTBEAT_INTERVAL + (HEARTBEAT_INTERVAL / 2); - match prev { - Some((prev_time, _tx)) if now.duration_since(prev_time) <= heartbeat_deadline => { - PingRole::LikelyHeartbeat - } - Some((prev_time, _tx)) => { - debug!( - elapsed = ?now.duration_since(prev_time), - "heartbeat missed, reactivating", - ); - PingRole::Activate - } - None => { - if let SendAddr::Udp(ref addr) = self.path { - event!( - target: "iroh::_events::holepunched", - Level::DEBUG, - remote_node = %self.node_id.fmt_short(), - path = ?addr, - direction = "incoming", - ); - } - PingRole::Activate - } - } - } - } - pub(super) fn add_source(&mut self, source: Source, now: Instant) { self.sources.insert(source, now); } pub(super) fn clear(&mut self) { - self.last_ping = None; - self.last_got_ping = None; self.call_me_maybe_time = None; - self.recent_pong = None; } fn summary(&self, mut w: impl std::fmt::Write) -> std::fmt::Result { @@ -287,15 +145,6 @@ impl PathState { if self.is_active() { write!(w, "active ")?; } - if let Some(ref pong) = self.recent_pong { - write!(w, "pong-received({:?} ago) ", pong.pong_at.elapsed())?; - } - if let Some(when) = self.last_incoming_ping() { - write!(w, "ping-received({:?} ago) ", when.elapsed())?; - } - if let Some(ref when) = self.last_ping { - write!(w, "ping-sent({:?} ago) ", when.elapsed())?; - } if let Some(last_source) = self.sources.iter().max_by_key(|&(_, instant)| instant) { write!( w, diff --git a/iroh/src/magicsock/node_map/udp_paths.rs b/iroh/src/magicsock/node_map/udp_paths.rs index 0d96c3bba0b..213dee522e8 100644 --- a/iroh/src/magicsock/node_map/udp_paths.rs +++ b/iroh/src/magicsock/node_map/udp_paths.rs @@ -9,15 +9,12 @@ use std::{collections::BTreeMap, net::SocketAddr}; use n0_future::time::{Duration, Instant}; use rand::seq::IteratorRandom; -use tracing::warn; use super::{ best_addr::{self, BestAddr}, - node_state::PongReply, path_state::PathState, IpPort, }; -use crate::disco::SendAddr; /// The address on which to send datagrams over UDP. /// @@ -91,6 +88,9 @@ impl NodeUdpPaths { chosen_candidate: None, } } + pub(super) fn addrs(&self) -> Vec { + self.paths.keys().map(|ip| (*ip).into()).collect() + } /// Returns the current UDP address to send on. /// @@ -147,34 +147,33 @@ impl NodeUdpPaths { // The highest acceptable latency for an endpoint path. If the latency is higher // then this the path will be ignored. const MAX_LATENCY: Duration = Duration::from_secs(60 * 60); - let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { - let best_latency = best_pong - .map(|p: &PongReply| p.latency) - .unwrap_or(MAX_LATENCY); - match state.recent_pong { - // This pong is better if it has a lower latency, or if it has the same - // latency but on an IPv6 path. - Some(ref pong) - if pong.latency < best_latency - || (pong.latency == best_latency && ipp.ip().is_ipv6()) => - { - Some(pong) - } - _ => best_pong, - } - }); + + // let best_pong = self.paths.iter().fold(None, |best_pong, (ipp, state)| { + // let best_latency = todo!(); + // match state.recent_pong { + // // This pong is better if it has a lower latency, or if it has the same + // // latency but on an IPv6 path. + // Some(ref pong) + // if pong.latency < best_latency + // || (pong.latency == best_latency && ipp.ip().is_ipv6()) => + // { + // Some(pong) + // } + // _ => best_pong, + // } + // }); // If we found a candidate, set to best addr - if let Some(pong) = best_pong { - if let SendAddr::Udp(addr) = pong.from { - warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); - self.best_addr.insert_if_better_or_reconfirm( - addr, - pong.latency, - best_addr::Source::BestCandidate, - pong.pong_at, - ) - } - } + // if let Some(pong) = best_pong { + // if let SendAddr::Udp(addr) = pong.from { + // warn!(%addr, "No best_addr was set, choose candidate with lowest latency"); + // self.best_addr.insert_if_better_or_reconfirm( + // addr, + // pong.latency, + // best_addr::Source::BestCandidate, + // pong.pong_at, + // ) + // } + // } } } diff --git a/iroh/src/net_report/ip_mapped_addrs.rs b/iroh/src/magicsock/relay_mapped_addrs.rs similarity index 65% rename from iroh/src/net_report/ip_mapped_addrs.rs rename to iroh/src/magicsock/relay_mapped_addrs.rs index d555d8b506d..1ac1eaabacc 100644 --- a/iroh/src/net_report/ip_mapped_addrs.rs +++ b/iroh/src/magicsock/relay_mapped_addrs.rs @@ -7,6 +7,7 @@ use std::{ }, }; +use iroh_base::{NodeId, RelayUrl}; use snafu::Snafu; /// Can occur when converting a [`SocketAddr`] to an [`IpMappedAddr`] @@ -38,7 +39,7 @@ impl IpMappedAddr { /// /// This generates a new IPv6 address in the Unique Local Address range (RFC 4193) /// which is recognised by iroh as an IP mapped address. - pub(super) fn generate() -> Self { + pub(crate) fn generate() -> Self { let mut addr = [0u8; 16]; addr[0] = Self::ADDR_PREFIXL; addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID); @@ -83,52 +84,50 @@ impl std::fmt::Display for IpMappedAddr { } } -/// A Map of [`IpMappedAddresses`] to [`SocketAddr`]. -// TODO(ramfox): before this is ready to be used beyond QAD, we should add -// mechanisms for keeping track of "aliveness" and pruning address, as we do -// with the `NodeMap` +/// Can occur when converting a [`SocketAddr`] to an [`RelayMappedAddr`] +#[derive(Debug, Snafu)] +#[snafu(display("Failed to convert"))] +pub struct RelayMappedAddrError; + +/// A Map of [`RelayMappedAddresses`] to [`SocketAddr`]. #[derive(Debug, Clone, Default)] -pub(crate) struct IpMappedAddresses(Arc>); +pub(crate) struct RelayMappedAddresses(Arc>); #[derive(Debug, Default)] pub(super) struct Inner { - by_mapped_addr: BTreeMap, - /// Because [`std::net::SocketAddrV6`] contains extra fields besides the IP - /// address and port (ie, flow_info and scope_id), the a [`std::net::SocketAddrV6`] - /// with the same IP addr and port might Hash to something different. - /// So to get a hashable key for the map, we are using `(IpAddr, u6)`. - by_ip_port: BTreeMap<(IpAddr, u16), IpMappedAddr>, + by_mapped_addr: BTreeMap, + by_url: BTreeMap<(RelayUrl, NodeId), IpMappedAddr>, } -impl IpMappedAddresses { - /// Adds a [`SocketAddr`] to the map and returns the generated [`IpMappedAddr`]. +impl RelayMappedAddresses { + /// Adds a [`RelayUrl`] to the map and returns the generated [`IpMappedAddr`]. /// - /// If this [`SocketAddr`] already exists in the map, it returns its + /// If this [`RelayUrl`] already exists in the map, it returns its /// associated [`IpMappedAddr`]. /// /// Otherwise a new [`IpMappedAddr`] is generated for it and returned. - pub(super) fn get_or_register(&self, socket_addr: SocketAddr) -> IpMappedAddr { - let ip_port = (socket_addr.ip(), socket_addr.port()); + pub(super) fn get_or_register(&self, relay: RelayUrl, node: NodeId) -> IpMappedAddr { let mut inner = self.0.lock().expect("poisoned"); - if let Some(mapped_addr) = inner.by_ip_port.get(&ip_port) { + if let Some(mapped_addr) = inner.by_url.get(&(relay.clone(), node)) { return *mapped_addr; } let ip_mapped_addr = IpMappedAddr::generate(); - inner.by_mapped_addr.insert(ip_mapped_addr, socket_addr); - inner.by_ip_port.insert(ip_port, ip_mapped_addr); + inner + .by_mapped_addr + .insert(ip_mapped_addr, (relay.clone(), node)); + inner.by_url.insert((relay, node), ip_mapped_addr); ip_mapped_addr } - /// Returns the [`IpMappedAddr`] for the given [`SocketAddr`]. - pub(crate) fn get_mapped_addr(&self, socket_addr: &SocketAddr) -> Option { - let ip_port = (socket_addr.ip(), socket_addr.port()); + /// Returns the [`IpMappedAddr`] for the given [`RelayUrl`]. + pub(crate) fn get_mapped_addr(&self, relay: RelayUrl, node: NodeId) -> Option { let inner = self.0.lock().expect("poisoned"); - inner.by_ip_port.get(&ip_port).copied() + inner.by_url.get(&(relay, node)).copied() } - /// Returns the [`SocketAddr`] for the given [`IpMappedAddr`]. - pub(crate) fn get_ip_addr(&self, mapped_addr: &IpMappedAddr) -> Option { + /// Returns the [`RelayUrl`] for the given [`IpMappedAddr`]. + pub(crate) fn get_url(&self, mapped_addr: &IpMappedAddr) -> Option<(RelayUrl, NodeId)> { let inner = self.0.lock().expect("poisoned"); - inner.by_mapped_addr.get(mapped_addr).copied() + inner.by_mapped_addr.get(mapped_addr).cloned() } } diff --git a/iroh/src/magicsock/transports/relay.rs b/iroh/src/magicsock/transports/relay.rs index 56a9da4cd23..ed57776ead4 100644 --- a/iroh/src/magicsock/transports/relay.rs +++ b/iroh/src/magicsock/transports/relay.rs @@ -106,7 +106,7 @@ impl RelayTransport { .segment_size .map_or(dm.datagrams.contents.len(), |s| s as usize); meta_out.ecn = None; - meta_out.dst_ip = None; // TODO: insert the relay url for this relay + meta_out.dst_ip = None; *addr = (dm.url, dm.src).into(); num_msgs += 1; diff --git a/iroh/src/net_report.rs b/iroh/src/net_report.rs index 857e0e3f1f3..c01fd42fcda 100644 --- a/iroh/src/net_report.rs +++ b/iroh/src/net_report.rs @@ -46,7 +46,6 @@ use self::reportgen::QadProbeReport; use self::reportgen::{ProbeFinished, ProbeReport}; mod defaults; -mod ip_mapped_addrs; mod metrics; mod probes; mod report; @@ -73,8 +72,6 @@ pub(crate) mod portmapper { } } -pub(crate) use ip_mapped_addrs::{IpMappedAddr, IpMappedAddresses}; - pub(crate) use self::reportgen::IfStateDetails; #[cfg(not(wasm_browser))] use self::reportgen::SocketState; @@ -215,7 +212,6 @@ impl Client { /// Creates a new net_report client. pub(crate) fn new( #[cfg(not(wasm_browser))] dns_resolver: DnsResolver, - #[cfg(not(wasm_browser))] ip_mapped_addrs: Option, relay_map: RelayMap, opts: Options, metrics: Arc, @@ -233,7 +229,6 @@ impl Client { let socket_state = SocketState { quic_client, dns_resolver, - ip_mapped_addrs, }; Client { @@ -438,7 +433,6 @@ impl Client { for relay_node in self.relay_map.nodes().take(MAX_RELAYS) { if if_state.have_v4 { debug!(?relay_node.url, "v4 QAD probe"); - let ip_mapped_addrs = self.socket_state.ip_mapped_addrs.clone(); let relay_node = relay_node.clone(); let dns_resolver = self.socket_state.dns_resolver.clone(); let quic_client = quic_client.clone(); @@ -448,7 +442,7 @@ impl Client { .child_token() .run_until_cancelled_owned(time::timeout( PROBES_TIMEOUT, - run_probe_v4(ip_mapped_addrs, relay_node, quic_client, dns_resolver), + run_probe_v4(relay_node, quic_client, dns_resolver), )) .instrument(info_span!("QAD IPv6", %relay_url)), ); @@ -456,7 +450,6 @@ impl Client { if if_state.have_v6 { debug!(?relay_node.url, "v6 QAD probe"); - let ip_mapped_addrs = self.socket_state.ip_mapped_addrs.clone(); let relay_node = relay_node.clone(); let dns_resolver = self.socket_state.dns_resolver.clone(); let quic_client = quic_client.clone(); @@ -466,7 +459,7 @@ impl Client { .child_token() .run_until_cancelled_owned(time::timeout( PROBES_TIMEOUT, - run_probe_v6(ip_mapped_addrs, relay_node, quic_client, dns_resolver), + run_probe_v6(relay_node, quic_client, dns_resolver), )) .instrument(info_span!("QAD IPv6", %relay_url)), ); @@ -671,20 +664,17 @@ impl Client { #[cfg(not(wasm_browser))] async fn run_probe_v4( - ip_mapped_addrs: Option, relay_node: Arc, quic_client: QuicClient, dns_resolver: DnsResolver, ) -> n0_snafu::Result<(QadProbeReport, QadConn)> { use n0_snafu::ResultExt; - let relay_addr_orig = reportgen::get_relay_addr_ipv4(&dns_resolver, &relay_node).await?; - let relay_addr = - reportgen::maybe_to_mapped_addr(ip_mapped_addrs.as_ref(), relay_addr_orig.into()); + let relay_addr = reportgen::get_relay_addr_ipv4(&dns_resolver, &relay_node).await?; - debug!(?relay_addr_orig, ?relay_addr, "relay addr v4"); + debug!(?relay_addr, "relay addr v4"); let host = relay_node.url.host_str().context("missing host url")?; - let conn = quic_client.create_conn(relay_addr, host).await?; + let conn = quic_client.create_conn(relay_addr.into(), host).await?; let mut receiver = conn.observed_external_addr(); // wait for an addr @@ -741,19 +731,16 @@ async fn run_probe_v4( #[cfg(not(wasm_browser))] async fn run_probe_v6( - ip_mapped_addrs: Option, relay_node: Arc, quic_client: QuicClient, dns_resolver: DnsResolver, ) -> n0_snafu::Result<(QadProbeReport, QadConn)> { use n0_snafu::ResultExt; - let relay_addr_orig = reportgen::get_relay_addr_ipv6(&dns_resolver, &relay_node).await?; - let relay_addr = - reportgen::maybe_to_mapped_addr(ip_mapped_addrs.as_ref(), relay_addr_orig.into()); + let relay_addr = reportgen::get_relay_addr_ipv6(&dns_resolver, &relay_node).await?; - debug!(?relay_addr_orig, ?relay_addr, "relay addr v6"); + debug!(?relay_addr, "relay addr v6"); let host = relay_node.url.host_str().context("missing host url")?; - let conn = quic_client.create_conn(relay_addr, host).await?; + let conn = quic_client.create_conn(relay_addr.into(), host).await?; let mut receiver = conn.observed_external_addr(); // wait for an addr @@ -878,7 +865,6 @@ mod tests { .insecure_skip_relay_cert_verify(true); let mut client = Client::new( resolver.clone(), - None, relay_map.clone(), opts.clone(), Default::default(), @@ -1079,8 +1065,7 @@ mod tests { println!("test: {}", tt.name); let relay_map = RelayMap::empty(); let opts = Options::default(); - let mut client = - Client::new(resolver.clone(), None, relay_map, opts, Default::default()); + let mut client = Client::new(resolver.clone(), relay_map, opts, Default::default()); for s in &mut tt.steps { // trigger the timer tokio::time::advance(Duration::from_secs(s.after)).await; diff --git a/iroh/src/net_report/reportgen.rs b/iroh/src/net_report/reportgen.rs index 06e46f1e381..57719c77dc5 100644 --- a/iroh/src/net_report/reportgen.rs +++ b/iroh/src/net_report/reportgen.rs @@ -45,10 +45,10 @@ use tokio_util::sync::CancellationToken; use tracing::{debug, debug_span, error, info_span, trace, warn, Instrument}; use url::Host; +#[cfg(not(wasm_browser))] +use super::defaults::timeouts::DNS_TIMEOUT; #[cfg(wasm_browser)] use super::portmapper; // We stub the library -#[cfg(not(wasm_browser))] -use super::{defaults::timeouts::DNS_TIMEOUT, ip_mapped_addrs::IpMappedAddresses}; use super::{ probes::{Probe, ProbePlan}, Report, @@ -105,8 +105,6 @@ pub(crate) struct SocketState { pub(crate) quic_client: Option, /// The DNS resolver to use for probes that need to resolve DNS records. pub(crate) dns_resolver: DnsResolver, - /// Optional [`IpMappedAddresses`] used to enable QAD in iroh - pub(crate) ip_mapped_addrs: Option, } impl Client { @@ -518,17 +516,6 @@ impl Probe { } } -#[cfg(not(wasm_browser))] -pub(super) fn maybe_to_mapped_addr( - ip_mapped_addrs: Option<&IpMappedAddresses>, - addr: SocketAddr, -) -> SocketAddr { - if let Some(ip_mapped_addrs) = ip_mapped_addrs { - return ip_mapped_addrs.get_or_register(addr).private_socket_addr(); - } - addr -} - #[cfg(not(wasm_browser))] #[derive(Debug, Snafu)] #[snafu(module)] @@ -874,7 +861,7 @@ mod tests { let quic_client = iroh_relay::quic::QuicClient::new(ep.clone(), client_config); let dns_resolver = DnsResolver::default(); - let (report, conn) = super::super::run_probe_v4(None, relay, quic_client, dns_resolver) + let (report, conn) = super::super::run_probe_v4(relay, quic_client, dns_resolver) .await .unwrap();