Skip to content

Commit db712c0

Browse files
wip
1 parent 27700fb commit db712c0

File tree

5 files changed

+94
-95
lines changed

5 files changed

+94
-95
lines changed

iroh/src/endpoint.rs

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::{
4545
magicsock::{self, Handle, NodeIdMappedAddr, OwnAddressSnafu},
4646
metrics::EndpointMetrics,
4747
net_report::Report,
48-
tls,
48+
tls, RelayProtocol,
4949
};
5050

5151
mod rtt_actor;
@@ -732,13 +732,12 @@ impl Endpoint {
732732
self.add_node_addr(node_addr.clone())?;
733733
}
734734
let node_id = node_addr.node_id;
735-
let relay_url = node_addr.relay_url.clone();
736735

737736
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this
738737
// address. Start discovery for this node if it's enabled and we have no valid or
739738
// verified address information for this node. Dropping the discovery cancels any
740739
// still running task.
741-
let (mapped_addr, direct_addresses, _discovery_drop_guard) = self
740+
let (mapped_addr, _discovery_drop_guard) = self
742741
.get_mapping_addr_and_maybe_start_discovery(node_addr)
743742
.await
744743
.context(NoAddressSnafu)?;
@@ -750,12 +749,7 @@ impl Endpoint {
750749
// Start connecting via quinn. This will time out after 10 seconds if no reachable
751750
// address is available.
752751

753-
debug!(
754-
?mapped_addr,
755-
?direct_addresses,
756-
?relay_url,
757-
"Attempting connection..."
758-
);
752+
debug!(?mapped_addr, "Attempting connection...");
759753
let client_config = {
760754
let mut alpn_protocols = vec![alpn.to_vec()];
761755
alpn_protocols.extend(options.additional_alpns);
@@ -768,27 +762,18 @@ impl Endpoint {
768762
client_config
769763
};
770764

771-
// TODO: race available addresses, this is currently only using the relay addr to connect
772-
let dest_addr = if relay_url.is_none() && !direct_addresses.is_empty() {
773-
direct_addresses[0]
774-
} else {
775-
mapped_addr.private_socket_addr()
776-
};
765+
let dest_addr = mapped_addr.private_socket_addr();
777766
let server_name = &tls::name::encode(node_id);
778767
let connect = self
779768
.msock
780769
.endpoint()
781770
.connect_with(client_config, dest_addr, server_name)
782771
.context(QuinnSnafu)?;
783772

784-
let mut paths = direct_addresses;
785-
paths.push(mapped_addr.private_socket_addr());
786-
787773
Ok(Connecting {
788774
inner: connect,
789775
ep: self.clone(),
790776
remote_node_id: Some(node_id),
791-
paths,
792777
_discovery_drop_guard,
793778
})
794779
}
@@ -1383,20 +1368,18 @@ impl Endpoint {
13831368
async fn get_mapping_addr_and_maybe_start_discovery(
13841369
&self,
13851370
node_addr: NodeAddr,
1386-
) -> Result<(NodeIdMappedAddr, Vec<SocketAddr>, Option<DiscoveryTask>), GetMappingAddressError>
1387-
{
1371+
) -> Result<(NodeIdMappedAddr, Option<DiscoveryTask>), GetMappingAddressError> {
13881372
let node_id = node_addr.node_id;
13891373

13901374
// Only return a mapped addr if we have some way of dialing this node, in other
13911375
// words, we have either a relay URL or at least one direct address.
13921376
let addr = if self.msock.has_send_address(node_id) {
1393-
let maddr = self.msock.get_mapping_addr(node_id);
1394-
maddr.map(|maddr| (maddr, self.msock.get_direct_addrs(node_id)))
1377+
self.msock.get_mapping_addr(node_id)
13951378
} else {
13961379
None
13971380
};
13981381
match addr {
1399-
Some((maddr, direct)) => {
1382+
Some(maddr) => {
14001383
// We have some way of dialing this node, but that doesn't actually mean
14011384
// we can actually connect to any of these addresses.
14021385
// Therefore, we will invoke the discovery service if we haven't received from the
@@ -1408,7 +1391,7 @@ impl Endpoint {
14081391
let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay)
14091392
.ok()
14101393
.flatten();
1411-
Ok((maddr, direct, discovery))
1394+
Ok((maddr, discovery))
14121395
}
14131396

14141397
None => {
@@ -1423,8 +1406,7 @@ impl Endpoint {
14231406
.await
14241407
.context(get_mapping_address_error::DiscoverSnafu)?;
14251408
if let Some(addr) = self.msock.get_mapping_addr(node_id) {
1426-
let direct = self.msock.get_direct_addrs(node_id);
1427-
Ok((addr, direct, Some(discovery)))
1409+
Ok((addr, Some(discovery)))
14281410
} else {
14291411
Err(get_mapping_address_error::NoAddressSnafu.build())
14301412
}
@@ -1653,8 +1635,6 @@ pub struct Connecting {
16531635
inner: quinn::Connecting,
16541636
ep: Endpoint,
16551637
remote_node_id: Option<NodeId>,
1656-
/// Additional paths to open once a connection is created
1657-
paths: Vec<SocketAddr>,
16581638
/// We run discovery as long as we haven't established a connection yet.
16591639
#[debug("Option<DiscoveryTask>")]
16601640
_discovery_drop_guard: Option<DiscoveryTask>,
@@ -1783,21 +1763,15 @@ impl Future for Connecting {
17831763
if let Some(remote) = *this.remote_node_id {
17841764
let weak_handle = conn.inner.weak_handle();
17851765
let path_events = conn.inner.path_events();
1786-
this.ep.msock.register_connection(
1787-
remote,
1788-
weak_handle,
1789-
path_events,
1790-
this.paths.clone(),
1791-
);
1766+
this.ep
1767+
.msock
1768+
.register_connection(remote, weak_handle, path_events);
17921769
} else if let Ok(remote) = conn.remote_node_id() {
17931770
let weak_handle = conn.inner.weak_handle();
17941771
let path_events = conn.inner.path_events();
1795-
this.ep.msock.register_connection(
1796-
remote,
1797-
weak_handle,
1798-
path_events,
1799-
this.paths.clone(),
1800-
);
1772+
this.ep
1773+
.msock
1774+
.register_connection(remote, weak_handle, path_events);
18011775
} else {
18021776
warn!("unable to determine node id for the remote");
18031777
}
@@ -2284,6 +2258,7 @@ mod tests {
22842258
};
22852259

22862260
use iroh_base::{NodeAddr, NodeId, SecretKey};
2261+
use iroh_relay::http::Protocol;
22872262
use n0_future::{task::AbortOnDropHandle, StreamExt};
22882263
use n0_snafu::{Error, Result, ResultExt};
22892264
use n0_watcher::Watcher;

iroh/src/magicsock.rs

Lines changed: 22 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use netwatch::{ip::LocalAddresses, UdpSocket};
4545
use quinn::{AsyncUdpSocket, ServerConfig, WeakConnectionHandle};
4646
use quinn_proto::PathEvent;
4747
use rand::Rng;
48+
use relay_mapped_addrs::RelayMappedAddresses;
4849
use smallvec::SmallVec;
4950
use snafu::{ResultExt, Snafu};
5051
use tokio::sync::{mpsc, Mutex as AsyncMutex};
@@ -79,6 +80,7 @@ use crate::{
7980

8081
mod metrics;
8182
mod node_map;
83+
mod relay_mapped_addrs;
8284

8385
pub(crate) mod transports;
8486

@@ -200,6 +202,8 @@ pub(crate) struct MagicSock {
200202

201203
/// Tracks the mapped IP addresses
202204
ip_mapped_addrs: IpMappedAddresses,
205+
/// Tracks the mapped IP addresses
206+
relay_mapped_addrs: RelayMappedAddresses,
203207
/// Local addresses
204208
local_addrs_watch: LocalAddrsWatch,
205209
/// Currently bound IP addresses of all sockets
@@ -295,25 +299,10 @@ impl MagicSock {
295299
remote: NodeId,
296300
conn: WeakConnectionHandle,
297301
mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
298-
paths: Vec<SocketAddr>,
299302
) {
300303
self.connection_map.insert(remote, conn);
301-
task::spawn(async move {
302-
let conn = conn.clone();
303-
for addr in paths {
304-
match conn.open_path(addr, quinn_proto::PathStatus::Backup).await {
305-
Ok(path) => {
306-
path.set_max_idle_timeout(Some(ENDPOINTS_FRESH_ENOUGH_DURATION))
307-
.ok();
308-
path.set_keep_alive_interval(Some(HEARTBEAT_INTERVAL)).ok();
309-
}
310-
Err(err) => {
311-
warn!("failed to open path {:?}", err);
312-
}
313-
}
314-
}
315-
});
316304

305+
// TODO: open additional paths
317306
// TODO: track task
318307
// TODO: find a good home for this
319308
task::spawn(async move {
@@ -461,6 +450,11 @@ impl MagicSock {
461450
self.node_map
462451
.add_node_addr(addr.clone(), source, &self.metrics.magicsock);
463452

453+
if let Some(url) = addr.relay_url() {
454+
self.relay_mapped_addrs
455+
.get_or_register(url.clone(), addr.node_id);
456+
}
457+
464458
// Add paths to the existing connections
465459
self.add_paths(addr);
466460

@@ -633,11 +627,8 @@ impl MagicSock {
633627

634628
let mut active_paths = SmallVec::<[_; 3]>::new();
635629

636-
match MappedAddr::from(transmit.destination) {
637-
MappedAddr::None(addr) => {
638-
active_paths.push(transports::Addr::from(addr));
639-
}
640-
MappedAddr::NodeId(dest) => {
630+
match MultipathMappedAddr::from(transmit.destination) {
631+
MultipathMappedAddr::Mixed(dest) => {
641632
trace!(
642633
dst = %dest,
643634
src = ?transmit.src_ip,
@@ -670,7 +661,10 @@ impl MagicSock {
670661
}
671662
}
672663
#[cfg(not(wasm_browser))]
673-
MappedAddr::Ip(dest) => {
664+
MultipathMappedAddr::Ip(addr) => {
665+
active_paths.push(transports::Addr::Ip(addr));
666+
}
667+
MultipathMappedAddr::Relay(dest) => {
674668
trace!(
675669
dst = %dest,
676670
src = ?transmit.src_ip,
@@ -680,9 +674,9 @@ impl MagicSock {
680674

681675
// Check if this is a known IpMappedAddr, and if so, send over UDP
682676
// Get the socket addr
683-
match self.ip_mapped_addrs.get_ip_addr(&dest) {
684-
Some(addr) => {
685-
active_paths.push(transports::Addr::from(addr));
677+
match self.relay_mapped_addrs.get_url(&dest) {
678+
Some((relay, node_id)) => {
679+
active_paths.push(transports::Addr::Relay(relay, node_id));
686680
}
687681
None => {
688682
error!(%dest, "unknown mapped address");
@@ -1103,33 +1097,7 @@ impl From<SocketAddr> for MultipathMappedAddr {
11031097
if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) {
11041098
return Self::Relay(ip_mapped_addr);
11051099
}
1106-
MappedAddr::Self(value)
1107-
}
1108-
}
1109-
}
1110-
}
1111-
1112-
#[derive(Clone, Debug)]
1113-
enum MappedAddr {
1114-
NodeId(NodeIdMappedAddr),
1115-
#[cfg(not(wasm_browser))]
1116-
Ip(IpMappedAddr),
1117-
None(SocketAddr),
1118-
}
1119-
1120-
impl From<SocketAddr> for MappedAddr {
1121-
fn from(value: SocketAddr) -> Self {
1122-
match value.ip() {
1123-
IpAddr::V4(_) => MappedAddr::None(value),
1124-
IpAddr::V6(addr) => {
1125-
if let Ok(node_id_mapped_addr) = NodeIdMappedAddr::try_from(addr) {
1126-
return MappedAddr::NodeId(node_id_mapped_addr);
1127-
}
1128-
#[cfg(not(wasm_browser))]
1129-
if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) {
1130-
return MappedAddr::Ip(ip_mapped_addr);
1131-
}
1132-
MappedAddr::None(value)
1100+
Self::Ip(value)
11331101
}
11341102
}
11351103
}
@@ -1315,6 +1283,7 @@ impl Handle {
13151283
bind_ip(addr_v4, addr_v6, &metrics).context(BindSocketsSnafu)?;
13161284

13171285
let ip_mapped_addrs = IpMappedAddresses::default();
1286+
let relay_mapped_addrs = RelayMappedAddresses::default();
13181287

13191288
let (actor_sender, actor_receiver) = mpsc::channel(256);
13201289

@@ -1362,6 +1331,7 @@ impl Handle {
13621331
node_map,
13631332
connection_map: Default::default(),
13641333
ip_mapped_addrs: ip_mapped_addrs.clone(),
1334+
relay_mapped_addrs,
13651335
discovery,
13661336
discovery_user_data: RwLock::new(discovery_user_data),
13671337
direct_addrs: Default::default(),
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::{collections::BTreeMap, sync::Arc};
2+
3+
use iroh_base::{NodeId, RelayUrl};
4+
use snafu::Snafu;
5+
6+
use crate::net_report::IpMappedAddr;
7+
8+
/// Can occur when converting a [`SocketAddr`] to an [`RelayMappedAddr`]
9+
#[derive(Debug, Snafu)]
10+
#[snafu(display("Failed to convert"))]
11+
pub struct RelayMappedAddrError;
12+
13+
/// A Map of [`RelayMappedAddresses`] to [`SocketAddr`].
14+
#[derive(Debug, Clone, Default)]
15+
pub(crate) struct RelayMappedAddresses(Arc<std::sync::Mutex<Inner>>);
16+
17+
#[derive(Debug, Default)]
18+
pub(super) struct Inner {
19+
by_mapped_addr: BTreeMap<IpMappedAddr, (RelayUrl, NodeId)>,
20+
by_url: BTreeMap<(RelayUrl, NodeId), IpMappedAddr>,
21+
}
22+
23+
impl RelayMappedAddresses {
24+
/// Adds a [`RelayUrl`] to the map and returns the generated [`IpMappedAddr`].
25+
///
26+
/// If this [`RelayUrl`] already exists in the map, it returns its
27+
/// associated [`IpMappedAddr`].
28+
///
29+
/// Otherwise a new [`IpMappedAddr`] is generated for it and returned.
30+
pub(super) fn get_or_register(&self, relay: RelayUrl, node: NodeId) -> IpMappedAddr {
31+
let mut inner = self.0.lock().expect("poisoned");
32+
if let Some(mapped_addr) = inner.by_url.get(&(relay.clone(), node)) {
33+
return *mapped_addr;
34+
}
35+
let ip_mapped_addr = IpMappedAddr::generate();
36+
inner
37+
.by_mapped_addr
38+
.insert(ip_mapped_addr, (relay.clone(), node));
39+
inner.by_url.insert((relay, node), ip_mapped_addr);
40+
ip_mapped_addr
41+
}
42+
43+
/// Returns the [`IpMappedAddr`] for the given [`RelayUrl`].
44+
pub(crate) fn get_mapped_addr(&self, relay: RelayUrl, node: NodeId) -> Option<IpMappedAddr> {
45+
let inner = self.0.lock().expect("poisoned");
46+
inner.by_url.get(&(relay, node)).copied()
47+
}
48+
49+
/// Returns the [`RelayUrl`] for the given [`IpMappedAddr`].
50+
pub(crate) fn get_url(&self, mapped_addr: &IpMappedAddr) -> Option<(RelayUrl, NodeId)> {
51+
let inner = self.0.lock().expect("poisoned");
52+
inner.by_mapped_addr.get(mapped_addr).cloned()
53+
}
54+
}

iroh/src/magicsock/transports/relay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ impl RelayTransport {
106106
.segment_size
107107
.map_or(dm.datagrams.contents.len(), |s| s as usize);
108108
meta_out.ecn = None;
109-
meta_out.dst_ip = None; // TODO: insert the relay url for this relay
109+
meta_out.dst_ip = None;
110110

111111
*addr = (dm.url, dm.src).into();
112112
num_msgs += 1;

iroh/src/net_report/ip_mapped_addrs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl IpMappedAddr {
3838
///
3939
/// This generates a new IPv6 address in the Unique Local Address range (RFC 4193)
4040
/// which is recognised by iroh as an IP mapped address.
41-
pub(super) fn generate() -> Self {
41+
pub(crate) fn generate() -> Self {
4242
let mut addr = [0u8; 16];
4343
addr[0] = Self::ADDR_PREFIXL;
4444
addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID);

0 commit comments

Comments
 (0)