From 14c6d1dde55febd67506eadce440a01c0f441d7c Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 27 Jun 2025 10:10:29 +0200 Subject: [PATCH 1/5] tests(iroh): add test to reconnect after forceful abort --- iroh/tests/reconnect.rs | 176 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 176 insertions(+) create mode 100644 iroh/tests/reconnect.rs diff --git a/iroh/tests/reconnect.rs b/iroh/tests/reconnect.rs new file mode 100644 index 00000000000..6e624b32fc6 --- /dev/null +++ b/iroh/tests/reconnect.rs @@ -0,0 +1,176 @@ +use std::time::Duration; + +use iroh::{ + endpoint::{BindError, Connection}, + protocol::{AcceptError, ProtocolHandler, Router}, + Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, SecretKey, Watcher, +}; +use n0_future::time::timeout; +use n0_snafu::{Result, ResultExt}; +use rand::SeedableRng; +use tokio::sync::mpsc; +use tokio_util::sync::CancellationToken; +use tracing::info; + +#[cfg(feature = "test-utils")] +#[tokio::test] +// #[traced_test] +async fn can_die_and_reconnect() -> Result { + tracing_subscriber::fmt::init(); + + let max_wait = Duration::from_secs(5); + + #[derive(Debug, Clone)] + struct TestProtocol(mpsc::Sender<(NodeId, String)>); + + const TEST_ALPN: &[u8] = b"/iroh/test/1"; + + impl ProtocolHandler for TestProtocol { + async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { + let remote_node_id = connection.remote_node_id()?; + let mut stream = connection.accept_uni().await?; + let data = stream + .read_to_end(64) + .await + .map_err(AcceptError::from_err)?; + let s = String::from_utf8(data).map_err(AcceptError::from_err)?; + self.0 + .send((remote_node_id, s)) + .await + .map_err(AcceptError::from_err)?; + Ok(()) + } + } + + /// Runs a future in a separate runtime on a separate thread, cancelling everything + /// abruptly once `cancel` is invoked. + fn run_in_thread( + cancel: CancellationToken, + fut: impl std::future::Future + Send + 'static, + ) -> std::thread::JoinHandle> { + std::thread::spawn(move || { + let rt = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build() + .unwrap(); + rt.block_on(async move { cancel.run_until_cancelled(fut).await }) + }) + } + + /// Spawns a new client endpoint + async fn spawn_client( + secret_key: SecretKey, + relay_map: RelayMap, + ) -> Result { + let ep = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Custom(relay_map)) + .insecure_skip_relay_cert_verify(true) + .bind() + .await?; + Ok(ep) + } + + /// Spawn a server endpoint, sending incoming messages on `tx`. + async fn spawn_server( + secret_key: SecretKey, + relay_map: RelayMap, + tx: mpsc::Sender<(NodeId, String)>, + ) -> Result { + let ep = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Custom(relay_map)) + .insecure_skip_relay_cert_verify(true) + .bind() + .await?; + let router = Router::builder(ep) + .accept(TEST_ALPN, TestProtocol(tx)) + .spawn(); + Ok(router) + } + + /// Binds an endpoint, connects to `server_addr`, sends a message, and then do nothing until aborted externally. + async fn connect_once( + secret_key: SecretKey, + relay_map: RelayMap, + server_addr: NodeAddr, + msg: String, + ) -> Result { + let endpoint = spawn_client(secret_key, relay_map).await?; + info!(node_id = %endpoint.node_id().fmt_short(), "client node spawned"); + let conn = endpoint.connect(server_addr, TEST_ALPN).await?; + let mut stream = conn.open_uni().await.e()?; + stream.write_all(msg.as_bytes()).await.e()?; + stream.finish().e()?; + std::future::pending::<()>().await; + Ok(()) + } + + let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap(); + let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1); + + let (addr_tx, addr_rx) = tokio::sync::oneshot::channel(); + let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3); + let recv_task = tokio::task::spawn({ + let relay_map = relay_map.clone(); + let secret_key = SecretKey::generate(&mut rng); + async move { + let router = spawn_server(secret_key, relay_map, msgs_recv_tx).await?; + let addr = router.endpoint().node_addr().initialized().await?; + info!(node_id = %addr.node_id.fmt_short(), "server node spawned"); + addr_tx.send(addr).unwrap(); + std::future::pending::<()>().await; + Result::<_, n0_snafu::Error>::Ok(()) + } + }); + + let server_addr = addr_rx.await.e()?; + + // spawn a node, send a message, and then abruptly terminate the node ungracefully + // after the message was received on our receiver node. + let cancel = CancellationToken::new(); + let client_secret_key = SecretKey::generate(&mut rng); + info!("spawn client node"); + let join_handle_1 = run_in_thread( + cancel.clone(), + connect_once( + client_secret_key.clone(), + relay_map.clone(), + server_addr.clone(), + "msg1".to_string(), + ), + ); + // assert that we received the message on the receiver node. + let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap(); + assert_eq!(msg.0, client_secret_key.public()); + assert_eq!(&msg.1, "msg1"); + info!("kill client node"); + cancel.cancel(); + + // spawns the node again with the same node id, and send another message + let cancel = CancellationToken::new(); + info!("respawn client node"); + let join_handle_2 = run_in_thread( + cancel.clone(), + connect_once( + client_secret_key.clone(), + relay_map.clone(), + server_addr.clone(), + "msg2".to_string(), + ), + ); + // assert that we received the message on the server node. + // this means that the reconnect with the same node id worked. + let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap(); + assert_eq!(msg.0, client_secret_key.public()); + assert_eq!(&msg.1, "msg2"); + info!("kill client node"); + cancel.cancel(); + + info!("kill recv node"); + recv_task.abort(); + assert!(join_handle_1.join().unwrap().is_none()); + assert!(join_handle_2.join().unwrap().is_none()); + + Ok(()) +} From 9866f0ec666cde84cb4208cc63a0304e3121bcb9 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 27 Jun 2025 11:12:30 +0200 Subject: [PATCH 2/5] tests: simplify abort-and-reconnect test --- iroh/src/endpoint.rs | 81 ++++++++++++++++++ iroh/tests/reconnect.rs | 176 ---------------------------------------- 2 files changed, 81 insertions(+), 176 deletions(-) delete mode 100644 iroh/tests/reconnect.rs diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 20e241e3e0b..674a99544e5 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -3216,4 +3216,85 @@ mod tests { Ok(()) } + + /// Test that we can immediately reconnect after respawning an endpoint with the same node id. + #[tokio::test] + #[traced_test] + async fn can_abort_and_reconnect() -> Result { + const TEST_ALPN: &[u8] = b"/iroh/test/1"; + const TIMEOUT: Duration = Duration::from_secs(5); + + let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1); + + // Spawn a server endpoint. + let server = Endpoint::builder() + .secret_key(SecretKey::generate(&mut rng)) + .relay_mode(RelayMode::Disabled) + .alpns(vec![TEST_ALPN.to_vec()]) + .bind() + .await?; + let server_addr = server.node_addr().initialized().await.e()?; + + // The server accepts all connections, waits for them being closed, and sends the + // close code over a channel. + let (tx, mut rx) = tokio::sync::mpsc::channel(1); + let server_loop = tokio::task::spawn(async move { + while let Some(conn) = server.accept().await { + let conn = conn.accept().e()?.await.e()?; + let res = match conn.closed().await { + ConnectionError::ApplicationClosed(frame) => Ok(u64::from(frame.error_code)), + reason => Err(reason), + }; + tx.send(res).await.e()?; + } + Result::<_, n0_snafu::Error>::Ok(()) + }); + + // Clients connect to the server, and immediately close the connection with a code + // and then close the endpoint. + async fn connect(secret_key: SecretKey, addr: NodeAddr, code: u32) -> Result { + info!("spawn client node {}", secret_key.public().fmt_short()); + let ep = Endpoint::builder() + .secret_key(secret_key) + .relay_mode(RelayMode::Disabled) + .bind() + .await?; + info!( + "connect client {} ({:?}) to {addr:?}", + ep.node_id().fmt_short(), + ep.bound_sockets(), + ); + let conn = ep.connect(addr, TEST_ALPN).await?; + conn.close(code.into(), b"bye"); + Ok(ep) + } + + let client_secret_key = SecretKey::generate(&mut rng); + + // First connection + let ep = n0_future::time::timeout( + TIMEOUT, + connect(client_secret_key.clone(), server_addr.clone(), 23), + ) + .await + .e()??; + assert_eq!(rx.recv().await.unwrap().unwrap(), 23); + // close the endpoint in a separate task, to not lose time for our immediate respawn testing + let close1 = tokio::task::spawn(async move { ep.close().await }); + + // Second connection + let ep = n0_future::time::timeout( + TIMEOUT, + connect(client_secret_key.clone(), server_addr.clone(), 24), + ) + .await + .e()??; + assert_eq!(rx.recv().await.unwrap().unwrap(), 24); + + close1.await.e()?; + ep.close().await; + server_loop.abort(); + + Ok(()) + } } diff --git a/iroh/tests/reconnect.rs b/iroh/tests/reconnect.rs deleted file mode 100644 index 6e624b32fc6..00000000000 --- a/iroh/tests/reconnect.rs +++ /dev/null @@ -1,176 +0,0 @@ -use std::time::Duration; - -use iroh::{ - endpoint::{BindError, Connection}, - protocol::{AcceptError, ProtocolHandler, Router}, - Endpoint, NodeAddr, NodeId, RelayMap, RelayMode, SecretKey, Watcher, -}; -use n0_future::time::timeout; -use n0_snafu::{Result, ResultExt}; -use rand::SeedableRng; -use tokio::sync::mpsc; -use tokio_util::sync::CancellationToken; -use tracing::info; - -#[cfg(feature = "test-utils")] -#[tokio::test] -// #[traced_test] -async fn can_die_and_reconnect() -> Result { - tracing_subscriber::fmt::init(); - - let max_wait = Duration::from_secs(5); - - #[derive(Debug, Clone)] - struct TestProtocol(mpsc::Sender<(NodeId, String)>); - - const TEST_ALPN: &[u8] = b"/iroh/test/1"; - - impl ProtocolHandler for TestProtocol { - async fn accept(&self, connection: Connection) -> Result<(), AcceptError> { - let remote_node_id = connection.remote_node_id()?; - let mut stream = connection.accept_uni().await?; - let data = stream - .read_to_end(64) - .await - .map_err(AcceptError::from_err)?; - let s = String::from_utf8(data).map_err(AcceptError::from_err)?; - self.0 - .send((remote_node_id, s)) - .await - .map_err(AcceptError::from_err)?; - Ok(()) - } - } - - /// Runs a future in a separate runtime on a separate thread, cancelling everything - /// abruptly once `cancel` is invoked. - fn run_in_thread( - cancel: CancellationToken, - fut: impl std::future::Future + Send + 'static, - ) -> std::thread::JoinHandle> { - std::thread::spawn(move || { - let rt = tokio::runtime::Builder::new_current_thread() - .enable_all() - .build() - .unwrap(); - rt.block_on(async move { cancel.run_until_cancelled(fut).await }) - }) - } - - /// Spawns a new client endpoint - async fn spawn_client( - secret_key: SecretKey, - relay_map: RelayMap, - ) -> Result { - let ep = Endpoint::builder() - .secret_key(secret_key) - .relay_mode(RelayMode::Custom(relay_map)) - .insecure_skip_relay_cert_verify(true) - .bind() - .await?; - Ok(ep) - } - - /// Spawn a server endpoint, sending incoming messages on `tx`. - async fn spawn_server( - secret_key: SecretKey, - relay_map: RelayMap, - tx: mpsc::Sender<(NodeId, String)>, - ) -> Result { - let ep = Endpoint::builder() - .secret_key(secret_key) - .relay_mode(RelayMode::Custom(relay_map)) - .insecure_skip_relay_cert_verify(true) - .bind() - .await?; - let router = Router::builder(ep) - .accept(TEST_ALPN, TestProtocol(tx)) - .spawn(); - Ok(router) - } - - /// Binds an endpoint, connects to `server_addr`, sends a message, and then do nothing until aborted externally. - async fn connect_once( - secret_key: SecretKey, - relay_map: RelayMap, - server_addr: NodeAddr, - msg: String, - ) -> Result { - let endpoint = spawn_client(secret_key, relay_map).await?; - info!(node_id = %endpoint.node_id().fmt_short(), "client node spawned"); - let conn = endpoint.connect(server_addr, TEST_ALPN).await?; - let mut stream = conn.open_uni().await.e()?; - stream.write_all(msg.as_bytes()).await.e()?; - stream.finish().e()?; - std::future::pending::<()>().await; - Ok(()) - } - - let (relay_map, _relay_url, _guard) = iroh::test_utils::run_relay_server().await.unwrap(); - let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1); - - let (addr_tx, addr_rx) = tokio::sync::oneshot::channel(); - let (msgs_recv_tx, mut msgs_recv_rx) = tokio::sync::mpsc::channel(3); - let recv_task = tokio::task::spawn({ - let relay_map = relay_map.clone(); - let secret_key = SecretKey::generate(&mut rng); - async move { - let router = spawn_server(secret_key, relay_map, msgs_recv_tx).await?; - let addr = router.endpoint().node_addr().initialized().await?; - info!(node_id = %addr.node_id.fmt_short(), "server node spawned"); - addr_tx.send(addr).unwrap(); - std::future::pending::<()>().await; - Result::<_, n0_snafu::Error>::Ok(()) - } - }); - - let server_addr = addr_rx.await.e()?; - - // spawn a node, send a message, and then abruptly terminate the node ungracefully - // after the message was received on our receiver node. - let cancel = CancellationToken::new(); - let client_secret_key = SecretKey::generate(&mut rng); - info!("spawn client node"); - let join_handle_1 = run_in_thread( - cancel.clone(), - connect_once( - client_secret_key.clone(), - relay_map.clone(), - server_addr.clone(), - "msg1".to_string(), - ), - ); - // assert that we received the message on the receiver node. - let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap(); - assert_eq!(msg.0, client_secret_key.public()); - assert_eq!(&msg.1, "msg1"); - info!("kill client node"); - cancel.cancel(); - - // spawns the node again with the same node id, and send another message - let cancel = CancellationToken::new(); - info!("respawn client node"); - let join_handle_2 = run_in_thread( - cancel.clone(), - connect_once( - client_secret_key.clone(), - relay_map.clone(), - server_addr.clone(), - "msg2".to_string(), - ), - ); - // assert that we received the message on the server node. - // this means that the reconnect with the same node id worked. - let msg = timeout(max_wait, msgs_recv_rx.recv()).await.e()?.unwrap(); - assert_eq!(msg.0, client_secret_key.public()); - assert_eq!(&msg.1, "msg2"); - info!("kill client node"); - cancel.cancel(); - - info!("kill recv node"); - recv_task.abort(); - assert!(join_handle_1.join().unwrap().is_none()); - assert!(join_handle_2.join().unwrap().is_none()); - - Ok(()) -} From 087f003b5f486697fe0b3c3ca6bcd9cbd350f50e Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 27 Jun 2025 11:36:17 +0200 Subject: [PATCH 3/5] fix(iroh): always ping new paths --- iroh/src/magicsock/node_map/node_state.rs | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/iroh/src/magicsock/node_map/node_state.rs b/iroh/src/magicsock/node_map/node_state.rs index be1e0a58dbe..13e5c0264f6 100644 --- a/iroh/src/magicsock/node_map/node_state.rs +++ b/iroh/src/magicsock/node_map/node_state.rs @@ -789,12 +789,13 @@ impl NodeState { self.prune_direct_addresses(); } - // if the endpoint does not yet have a best_addrr + // if the endpoint does not yet have a best_addr, or if this is a new path let needs_ping_back = if matches!(path, SendAddr::Udp(_)) - && matches!( + && (matches!( self.udp_paths.best_addr.state(now), best_addr::State::Empty | best_addr::State::Outdated(_) - ) { + ) || matches!(role, PingRole::NewPath)) + { // We also need to send a ping to make this path available to us as well. This // is always sent together with a pong. So in the worst case the pong gets lost // and this ping does not. In that case we ping-pong until both sides have From a00644abe59ae689d9d629a835f4d0a3d684fa42 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 27 Jun 2025 12:27:11 +0200 Subject: [PATCH 4/5] tests: improve reconnect test --- iroh/src/endpoint.rs | 32 ++++++++++++++++++++++---------- 1 file changed, 22 insertions(+), 10 deletions(-) diff --git a/iroh/src/endpoint.rs b/iroh/src/endpoint.rs index 674a99544e5..f5301bb9b49 100644 --- a/iroh/src/endpoint.rs +++ b/iroh/src/endpoint.rs @@ -3218,11 +3218,17 @@ mod tests { } /// Test that we can immediately reconnect after respawning an endpoint with the same node id. + /// + /// Importantly, this test does not use a relay. This means there is no other path but the UDP + /// path available. The respawned endpoint will have different direct addrs from the previous + /// endpoint. The test attempts to ensure that this works, i.e. that the server endpoint will + /// reply to the new addresses and not keep sending to the old UDP addresses which won't be + /// received anymore. #[tokio::test] #[traced_test] async fn can_abort_and_reconnect() -> Result { const TEST_ALPN: &[u8] = b"/iroh/test/1"; - const TIMEOUT: Duration = Duration::from_secs(5); + const TIMEOUT: Duration = Duration::from_secs(10); let mut rng = &mut rand_chacha::ChaCha12Rng::seed_from_u64(1); @@ -3241,10 +3247,15 @@ mod tests { let server_loop = tokio::task::spawn(async move { while let Some(conn) = server.accept().await { let conn = conn.accept().e()?.await.e()?; + info!("server ACCEPT"); + let mut stream = conn.open_uni().await.e()?; + stream.write_all(b"hi").await.e()?; + stream.finish().e()?; let res = match conn.closed().await { ConnectionError::ApplicationClosed(frame) => Ok(u64::from(frame.error_code)), reason => Err(reason), }; + info!("server CLOSED"); tx.send(res).await.e()?; } Result::<_, n0_snafu::Error>::Ok(()) @@ -3259,13 +3270,16 @@ mod tests { .relay_mode(RelayMode::Disabled) .bind() .await?; - info!( - "connect client {} ({:?}) to {addr:?}", - ep.node_id().fmt_short(), - ep.bound_sockets(), - ); + let ipv4 = ep.bound_sockets()[0]; + let node_id = ep.node_id().fmt_short(); + info!(%node_id, %ipv4, "client CONNECT"); let conn = ep.connect(addr, TEST_ALPN).await?; + info!(%node_id, %ipv4, "client CONNECTED"); + let mut stream = conn.accept_uni().await.e()?; + let buf = stream.read_to_end(2).await.e()?; + assert_eq!(&buf, b"hi"); conn.close(code.into(), b"bye"); + info!(%node_id, %ipv4, "client CLOSE"); Ok(ep) } @@ -3279,8 +3293,7 @@ mod tests { .await .e()??; assert_eq!(rx.recv().await.unwrap().unwrap(), 23); - // close the endpoint in a separate task, to not lose time for our immediate respawn testing - let close1 = tokio::task::spawn(async move { ep.close().await }); + ep.close().await; // Second connection let ep = n0_future::time::timeout( @@ -3290,9 +3303,8 @@ mod tests { .await .e()??; assert_eq!(rx.recv().await.unwrap().unwrap(), 24); - - close1.await.e()?; ep.close().await; + server_loop.abort(); Ok(()) From a8ee257e2f94c7b4ae0badd67b25513b98c3d46f Mon Sep 17 00:00:00 2001 From: Frando Date: Mon, 30 Jun 2025 11:22:48 +0200 Subject: [PATCH 5/5] fix(iroh): when ports change for the current best address, use it --- iroh/src/magicsock/node_map/best_addr.rs | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/iroh/src/magicsock/node_map/best_addr.rs b/iroh/src/magicsock/node_map/best_addr.rs index 48866e27813..1f4f7506f9a 100644 --- a/iroh/src/magicsock/node_map/best_addr.rs +++ b/iroh/src/magicsock/node_map/best_addr.rs @@ -121,11 +121,19 @@ impl BestAddr { } Some(state) => { let candidate = AddrLatency { addr, latency }; + // If the current address has exceeded its trust interval, or if the new address has lower latency, + // use the new address. if !state.is_trusted(confirmed_at) || candidate.is_better_than(&state.addr) { self.insert(addr, latency, source, confirmed_at); + // If the new address is equal to the current address, mark it as reconfirmed now and expand its trust + // interval. } else if state.addr.addr == addr { state.confirmed_at = confirmed_at; state.trust_until = Some(source.trust_until(confirmed_at)); + // If we receive a pong on a different port but the same IP address as the current best addr, + // we assume that the endpoint has rebound, and thus use the new port. + } else if state.addr.addr.ip() == addr.ip() { + self.insert(addr, latency, source, confirmed_at) } } }