diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 20e241e3e0b..f5301bb9b49 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -3216,4 +3216,97 @@ mod tests { Ok(()) } + + /// Test that we can immediately reconnect after respawning an endpoint with the same node id. + /// + /// Importantly, this test does not use a relay. This means there is no other path but the UDP + /// path available. The respawned endpoint will have different direct addrs from the previous + /// endpoint. The test attempts to ensure that this works, i.e. that the server endpoint will + /// reply to the new addresses and not keep sending to the old UDP addresses which won't be + /// received anymore. + #[tokio::test] + #[traced_test] + async fn can_abort_and_reconnect() -> Result { + const TEST_ALPN: &[u8] = b"/iroh/test/1"; + const TIMEOUT: Duration = Duration::from_secs(10); + + let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1); + + // Spawn a server endpoint. + let server = Endpoint::builder() + .secret_key(SecretKey::generate(&mut rng)) + .relay_mode(RelayMode::Disabled) + .alpns(vec![TEST_ALPN.to_vec()]) + .bind() + .await?; + let server_addr = server.node_addr().initialized().await.e()?; + + // The server accepts all connections, waits for them being closed, and sends the + // close code over a channel. + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let server_loop = tokio::task::spawn(async move { + while let Some(conn) = server.accept().await { + let conn = conn.accept().e()?.await.e()?; + info!("server ACCEPT"); + let mut stream = conn.open_uni().await.e()?; + stream.write_all(b"hi").await.e()?; + stream.finish().e()?; + let res = match conn.closed().await { + ConnectionError::ApplicationClosed(frame) => Ok(u64::from(frame.error_code)), + reason => Err(reason), + }; + info!("server CLOSED"); + tx.send(res).await.e()?; + } + Result::<_, n0_snafu::Error>::Ok(()) + }); + + // Clients connect to the server, and immediately close the connection with a code + // and then close the endpoint. + async fn connect(secret_key: SecretKey, addr: NodeAddr, code: u32) -> Result { + info!("spawn client node {}", secret_key.public().fmt_short()); + let ep = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Disabled) + .bind() + .await?; + let ipv4 = ep.bound_sockets()[0]; + let node_id = ep.node_id().fmt_short(); + info!(%node_id, %ipv4, "client CONNECT"); + let conn = ep.connect(addr, TEST_ALPN).await?; + info!(%node_id, %ipv4, "client CONNECTED"); + let mut stream = conn.accept_uni().await.e()?; + let buf = stream.read_to_end(2).await.e()?; + assert_eq!(&buf, b"hi"); + conn.close(code.into(), b"bye"); + info!(%node_id, %ipv4, "client CLOSE"); + Ok(ep) + } + + let client_secret_key = SecretKey::generate(&mut rng); + + // First connection + let ep = n0_future::time::timeout( + TIMEOUT, + connect(client_secret_key.clone(), server_addr.clone(), 23), + ) + .await + .e()??; + assert_eq!(rx.recv().await.unwrap().unwrap(), 23); + ep.close().await; + + // Second connection + let ep = n0_future::time::timeout( + TIMEOUT, + connect(client_secret_key.clone(), server_addr.clone(), 24), + ) + .await + .e()??; + assert_eq!(rx.recv().await.unwrap().unwrap(), 24); + ep.close().await; + + server_loop.abort(); + + Ok(()) + } } diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs index 48866e27813..1f4f7506f9a 100644 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ b/iroh/src/magicsock/node_map/best_addr.rs @@ -121,11 +121,19 @@ impl BestAddr { } Some(state) => { let candidate = AddrLatency { addr, latency }; + // If the current address has exceeded its trust interval, or if the new address has lower latency, + // use the new address. if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) { self.insert(addr, latency, source, confirmed_at); + // If the new address is equal to the current address, mark it as reconfirmed now and expand its trust + // interval. } else if state.addr.addr == addr { state.confirmed_at = confirmed_at; state.trust_until = Some(source.trust_until(confirmed_at)); + // If we receive a pong on a different port but the same IP address as the current best addr, + // we assume that the endpoint has rebound, and thus use the new port. + } else if state.addr.addr.ip() == addr.ip() { + self.insert(addr, latency, source, confirmed_at) } } } diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index be1e0a58dbe..13e5c0264f6 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -789,12 +789,13 @@ impl NodeState { self.prune_direct_addresses(); } - // if the endpoint does not yet have a best_addrr + // if the endpoint does not yet have a best_addr, or if this is a new path let needs_ping_back = if matches!(path, SendAddr::Udp(_)) - && matches!( + && (matches!( self.udp_paths.best_addr.state(now), best_addr::State::Empty | best_addr::State::Outdated(_) - ) { + ) || matches!(role, PingRole::NewPath)) + { // We also need to send a ping to make this path available to us as well. This // is always sent together with a pong. So in the worst case the pong gets lost // and this ping does not. In that case we ping-pong until both sides have