Skip to content

Commit 7986394

Browse files
authored
feat(iroh-net): add MagicEndpoint::conn_type_stream returns a stream that reports connection type changes for a node_id (#2161)
## Description `MagicEndpoint::conn_type_stream` returns a `Stream` that reports changes for a `magicsock::Endpoint` with a given `node_id` in a `magicsock::NodeMap`. It will error if no address information for that `node_id` exists in the `NodeMap`. This PR also adjusts the `Endpoint::info()` method to use the same `ConnectionType` that gets reported to the stream. ## Change checklist - [x] Self-review. - [x] Documentation updates if relevant. - [x] Tests if relevant.
1 parent b07547b commit 7986394

File tree

4 files changed

+236
-23
lines changed

4 files changed

+236
-23
lines changed

iroh-net/src/magic_endpoint.rs

Lines changed: 108 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ use crate::{
1515
discovery::{Discovery, DiscoveryTask},
1616
dns::{default_resolver, DnsResolver},
1717
key::{PublicKey, SecretKey},
18-
magicsock::{self, MagicSock},
18+
magicsock::{self, ConnectionTypeStream, MagicSock},
1919
relay::{RelayMap, RelayMode, RelayUrl},
2020
tls, NodeId,
2121
};
@@ -402,6 +402,16 @@ impl MagicEndpoint {
402402
self.connect(addr, alpn).await
403403
}
404404

405+
/// Returns a stream that reports changes in the [`crate::magicsock::ConnectionType`]
406+
/// for the given `node_id`.
407+
///
408+
/// # Errors
409+
///
410+
/// Will error if we do not have any address information for the given `node_id`
411+
pub fn conn_type_stream(&self, node_id: &PublicKey) -> Result<ConnectionTypeStream> {
412+
self.msock.conn_type_stream(node_id)
413+
}
414+
405415
/// Connect to a remote endpoint.
406416
///
407417
/// A [`NodeAddr`] is required. It must contain the [`NodeId`] to dial and may also contain a
@@ -630,7 +640,7 @@ mod tests {
630640
use rand_core::SeedableRng;
631641
use tracing::{error_span, info, info_span, Instrument};
632642

633-
use crate::test_utils::run_relay_server;
643+
use crate::{magicsock::ConnectionType, test_utils::run_relay_server};
634644

635645
use super::*;
636646

@@ -971,4 +981,100 @@ mod tests {
971981
p1_connect.await.unwrap();
972982
p2_connect.await.unwrap();
973983
}
984+
985+
#[tokio::test]
986+
async fn magic_endpoint_conn_type_stream() {
987+
let _logging_guard = iroh_test::logging::setup();
988+
let (relay_map, relay_url, _relay_guard) = run_relay_server().await.unwrap();
989+
let mut rng = rand_chacha::ChaCha8Rng::seed_from_u64(42);
990+
let ep1_secret_key = SecretKey::generate_with_rng(&mut rng);
991+
let ep2_secret_key = SecretKey::generate_with_rng(&mut rng);
992+
let ep1 = MagicEndpoint::builder()
993+
.secret_key(ep1_secret_key)
994+
.insecure_skip_relay_cert_verify(true)
995+
.alpns(vec![TEST_ALPN.to_vec()])
996+
.relay_mode(RelayMode::Custom(relay_map.clone()))
997+
.bind(0)
998+
.await
999+
.unwrap();
1000+
let ep2 = MagicEndpoint::builder()
1001+
.secret_key(ep2_secret_key)
1002+
.insecure_skip_relay_cert_verify(true)
1003+
.alpns(vec![TEST_ALPN.to_vec()])
1004+
.relay_mode(RelayMode::Custom(relay_map))
1005+
.bind(0)
1006+
.await
1007+
.unwrap();
1008+
1009+
async fn handle_direct_conn(ep: MagicEndpoint, node_id: PublicKey) -> Result<()> {
1010+
let node_addr = NodeAddr::new(node_id);
1011+
ep.add_node_addr(node_addr)?;
1012+
let stream = ep.conn_type_stream(&node_id)?;
1013+
async fn get_direct_event(
1014+
src: &PublicKey,
1015+
dst: &PublicKey,
1016+
mut stream: ConnectionTypeStream,
1017+
) -> Result<()> {
1018+
let src = src.fmt_short();
1019+
let dst = dst.fmt_short();
1020+
while let Some(conn_type) = stream.next().await {
1021+
tracing::info!(me = %src, dst = %dst, conn_type = ?conn_type);
1022+
if matches!(conn_type, ConnectionType::Direct(_)) {
1023+
return Ok(());
1024+
}
1025+
}
1026+
anyhow::bail!("conn_type stream ended before `ConnectionType::Direct`");
1027+
}
1028+
tokio::time::timeout(
1029+
Duration::from_secs(15),
1030+
get_direct_event(&ep.node_id(), &node_id, stream),
1031+
)
1032+
.await??;
1033+
Ok(())
1034+
}
1035+
1036+
let ep1_nodeid = ep1.node_id();
1037+
let ep2_nodeid = ep2.node_id();
1038+
1039+
let ep1_nodeaddr = ep1.my_addr().await.unwrap();
1040+
tracing::info!(
1041+
"node id 1 {ep1_nodeid}, relay URL {:?}",
1042+
ep1_nodeaddr.relay_url()
1043+
);
1044+
tracing::info!("node id 2 {ep2_nodeid}");
1045+
1046+
let res_ep1 = tokio::spawn(handle_direct_conn(ep1.clone(), ep2_nodeid));
1047+
1048+
let ep1_abort_handle = res_ep1.abort_handle();
1049+
let _ep1_guard = CallOnDrop::new(move || {
1050+
ep1_abort_handle.abort();
1051+
});
1052+
1053+
let res_ep2 = tokio::spawn(handle_direct_conn(ep2.clone(), ep1_nodeid));
1054+
let ep2_abort_handle = res_ep2.abort_handle();
1055+
let _ep2_guard = CallOnDrop::new(move || {
1056+
ep2_abort_handle.abort();
1057+
});
1058+
async fn accept(ep: MagicEndpoint) -> (PublicKey, String, quinn::Connection) {
1059+
let incoming = ep.accept().await.unwrap();
1060+
accept_conn(incoming).await.unwrap()
1061+
}
1062+
1063+
// create a node addr with no direct connections
1064+
let ep1_nodeaddr = NodeAddr::from_parts(ep1_nodeid, Some(relay_url), vec![]);
1065+
1066+
let accept_res = tokio::spawn(accept(ep1.clone()));
1067+
let accept_abort_handle = accept_res.abort_handle();
1068+
let _accept_guard = CallOnDrop::new(move || {
1069+
accept_abort_handle.abort();
1070+
});
1071+
1072+
let _conn_2 = ep2.connect(ep1_nodeaddr, TEST_ALPN).await.unwrap();
1073+
1074+
let (got_id, _, _conn) = accept_res.await.unwrap();
1075+
assert_eq!(ep2_nodeid, got_id);
1076+
1077+
res_ep1.await.unwrap().unwrap();
1078+
res_ep2.await.unwrap().unwrap();
1079+
}
9741080
}

iroh-net/src/magicsock.rs

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,9 @@ mod udp_conn;
8080
pub use crate::net::UdpSocket;
8181

8282
pub use self::metrics::Metrics;
83-
pub use self::node_map::{ConnectionType, ControlMsg, DirectAddrInfo, EndpointInfo};
83+
pub use self::node_map::{
84+
ConnectionType, ConnectionTypeStream, ControlMsg, DirectAddrInfo, EndpointInfo,
85+
};
8486
pub use self::timer::Timer;
8587

8688
/// How long we consider a STUN-derived endpoint valid for. UDP NAT mappings typically
@@ -1349,6 +1351,23 @@ impl MagicSock {
13491351
}
13501352
}
13511353

1354+
/// Returns a stream that reports the [`ConnectionType`] we have to the
1355+
/// given `node_id`.
1356+
///
1357+
/// The `NodeMap` continuously monitors the `node_id`'s endpoint for
1358+
/// [`ConnectionType`] changes, and sends the latest [`ConnectionType`]
1359+
/// on the stream.
1360+
///
1361+
/// The current [`ConnectionType`] will the the initial entry on the stream.
1362+
///
1363+
/// # Errors
1364+
///
1365+
/// Will return an error if there is no address information known about the
1366+
/// given `node_id`.
1367+
pub fn conn_type_stream(&self, node_id: &PublicKey) -> Result<node_map::ConnectionTypeStream> {
1368+
self.inner.node_map.conn_type_stream(node_id)
1369+
}
1370+
13521371
/// Get the cached version of the Ipv4 and Ipv6 addrs of the current connection.
13531372
pub fn local_addr(&self) -> Result<(SocketAddr, Option<SocketAddr>)> {
13541373
Ok(self.inner.local_addr())

iroh-net/src/magicsock/node_map.rs

Lines changed: 55 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ use std::{
33
hash::Hash,
44
net::{IpAddr, SocketAddr},
55
path::Path,
6+
pin::Pin,
7+
task::{Context, Poll},
68
time::Instant,
79
};
810

9-
use anyhow::{ensure, Context};
11+
use anyhow::{ensure, Context as _};
12+
use futures::Stream;
1013
use iroh_metrics::inc;
1114
use parking_lot::Mutex;
1215
use stun_rs::TransactionId;
@@ -209,6 +212,19 @@ impl NodeMap {
209212
self.inner.lock().endpoint_infos(now)
210213
}
211214

215+
/// Returns a stream of [`ConnectionType`].
216+
///
217+
/// Sends the current [`ConnectionType`] whenever any changes to the
218+
/// connection type for `public_key` has occured.
219+
///
220+
/// # Errors
221+
///
222+
/// Will return an error if there is not an entry in the [`NodeMap`] for
223+
/// the `public_key`
224+
pub fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result<ConnectionTypeStream> {
225+
self.inner.lock().conn_type_stream(public_key)
226+
}
227+
212228
/// Get the [`EndpointInfo`]s for each endpoint
213229
pub fn endpoint_info(&self, public_key: &PublicKey) -> Option<EndpointInfo> {
214230
self.inner.lock().endpoint_info(public_key)
@@ -389,6 +405,25 @@ impl NodeMapInner {
389405
.map(|ep| ep.info(Instant::now()))
390406
}
391407

408+
/// Returns a stream of [`ConnectionType`].
409+
///
410+
/// Sends the current [`ConnectionType`] whenever any changes to the
411+
/// connection type for `public_key` has occured.
412+
///
413+
/// # Errors
414+
///
415+
/// Will return an error if there is not an entry in the [`NodeMap`] for
416+
/// the `public_key`
417+
fn conn_type_stream(&self, public_key: &PublicKey) -> anyhow::Result<ConnectionTypeStream> {
418+
match self.get(EndpointId::NodeKey(public_key)) {
419+
Some(ep) => Ok(ConnectionTypeStream {
420+
initial: Some(ep.conn_type.get()),
421+
inner: ep.conn_type.watch().into_stream(),
422+
}),
423+
None => anyhow::bail!("No endpoint for {public_key:?} found"),
424+
}
425+
}
426+
392427
fn handle_pong(&mut self, sender: PublicKey, src: &DiscoMessageSource, pong: Pong) {
393428
if let Some(ep) = self.get_mut(EndpointId::NodeKey(&sender)).as_mut() {
394429
let insert = ep.handle_pong(&pong, src.into());
@@ -536,6 +571,25 @@ impl NodeMapInner {
536571
}
537572
}
538573

574+
/// Stream returning `ConnectionTypes`
575+
#[derive(Debug)]
576+
pub struct ConnectionTypeStream {
577+
initial: Option<ConnectionType>,
578+
inner: watchable::WatcherStream<ConnectionType>,
579+
}
580+
581+
impl Stream for ConnectionTypeStream {
582+
type Item = ConnectionType;
583+
584+
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
585+
let this = &mut *self;
586+
if let Some(initial_conn_type) = this.initial.take() {
587+
return Poll::Ready(Some(initial_conn_type));
588+
}
589+
Pin::new(&mut this.inner).poll_next(cx)
590+
}
591+
}
592+
539593
/// An (Ip, Port) pair.
540594
///
541595
/// NOTE: storing an [`IpPort`] is safer than storing a [`SocketAddr`] because for IPv6 socket

0 commit comments

Comments
 (0)