Skip to content

Commit 72cb071

Browse files
wip
1 parent bbccf98 commit 72cb071

File tree

5 files changed

+94
-95
lines changed

5 files changed

+94
-95
lines changed

iroh/src/endpoint.rs

Lines changed: 16 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ use crate::{
4545
magicsock::{self, Handle, NodeIdMappedAddr, OwnAddressSnafu},
4646
metrics::EndpointMetrics,
4747
net_report::Report,
48-
tls,
48+
tls, RelayProtocol,
4949
};
5050

5151
mod rtt_actor;
@@ -732,13 +732,12 @@ impl Endpoint {
732732
self.add_node_addr(node_addr.clone())?;
733733
}
734734
let node_id = node_addr.node_id;
735-
let relay_url = node_addr.relay_url.clone();
736735

737736
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this
738737
// address. Start discovery for this node if it's enabled and we have no valid or
739738
// verified address information for this node. Dropping the discovery cancels any
740739
// still running task.
741-
let (mapped_addr, direct_addresses, _discovery_drop_guard) = self
740+
let (mapped_addr, _discovery_drop_guard) = self
742741
.get_mapping_addr_and_maybe_start_discovery(node_addr)
743742
.await
744743
.context(NoAddressSnafu)?;
@@ -750,12 +749,7 @@ impl Endpoint {
750749
// Start connecting via quinn. This will time out after 10 seconds if no reachable
751750
// address is available.
752751

753-
debug!(
754-
?mapped_addr,
755-
?direct_addresses,
756-
?relay_url,
757-
"Attempting connection..."
758-
);
752+
debug!(?mapped_addr, "Attempting connection...");
759753
let client_config = {
760754
let mut alpn_protocols = vec![alpn.to_vec()];
761755
alpn_protocols.extend(options.additional_alpns);
@@ -768,27 +762,18 @@ impl Endpoint {
768762
client_config
769763
};
770764

771-
// TODO: race available addresses, this is currently only using the relay addr to connect
772-
let dest_addr = if relay_url.is_none() && !direct_addresses.is_empty() {
773-
direct_addresses[0]
774-
} else {
775-
mapped_addr.private_socket_addr()
776-
};
765+
let dest_addr = mapped_addr.private_socket_addr();
777766
let server_name = &tls::name::encode(node_id);
778767
let connect = self
779768
.msock
780769
.endpoint()
781770
.connect_with(client_config, dest_addr, server_name)
782771
.context(QuinnSnafu)?;
783772

784-
let mut paths = direct_addresses;
785-
paths.push(mapped_addr.private_socket_addr());
786-
787773
Ok(Connecting {
788774
inner: connect,
789775
ep: self.clone(),
790776
remote_node_id: Some(node_id),
791-
paths,
792777
_discovery_drop_guard,
793778
})
794779
}
@@ -1383,20 +1368,18 @@ impl Endpoint {
13831368
async fn get_mapping_addr_and_maybe_start_discovery(
13841369
&self,
13851370
node_addr: NodeAddr,
1386-
) -> Result<(NodeIdMappedAddr, Vec<SocketAddr>, Option<DiscoveryTask>), GetMappingAddressError>
1387-
{
1371+
) -> Result<(NodeIdMappedAddr, Option<DiscoveryTask>), GetMappingAddressError> {
13881372
let node_id = node_addr.node_id;
13891373

13901374
// Only return a mapped addr if we have some way of dialing this node, in other
13911375
// words, we have either a relay URL or at least one direct address.
13921376
let addr = if self.msock.has_send_address(node_id) {
1393-
let maddr = self.msock.get_mapping_addr(node_id);
1394-
maddr.map(|maddr| (maddr, self.msock.get_direct_addrs(node_id)))
1377+
self.msock.get_mapping_addr(node_id)
13951378
} else {
13961379
None
13971380
};
13981381
match addr {
1399-
Some((maddr, direct)) => {
1382+
Some(maddr) => {
14001383
// We have some way of dialing this node, but that doesn't actually mean
14011384
// we can actually connect to any of these addresses.
14021385
// Therefore, we will invoke the discovery service if we haven't received from the
@@ -1408,7 +1391,7 @@ impl Endpoint {
14081391
let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay)
14091392
.ok()
14101393
.flatten();
1411-
Ok((maddr, direct, discovery))
1394+
Ok((maddr, discovery))
14121395
}
14131396

14141397
None => {
@@ -1423,8 +1406,7 @@ impl Endpoint {
14231406
.await
14241407
.context(get_mapping_address_error::DiscoverSnafu)?;
14251408
if let Some(addr) = self.msock.get_mapping_addr(node_id) {
1426-
let direct = self.msock.get_direct_addrs(node_id);
1427-
Ok((addr, direct, Some(discovery)))
1409+
Ok((addr, Some(discovery)))
14281410
} else {
14291411
Err(get_mapping_address_error::NoAddressSnafu.build())
14301412
}
@@ -1653,8 +1635,6 @@ pub struct Connecting {
16531635
inner: quinn::Connecting,
16541636
ep: Endpoint,
16551637
remote_node_id: Option<NodeId>,
1656-
/// Additional paths to open once a connection is created
1657-
paths: Vec<SocketAddr>,
16581638
/// We run discovery as long as we haven't established a connection yet.
16591639
#[debug("Option<DiscoveryTask>")]
16601640
_discovery_drop_guard: Option<DiscoveryTask>,
@@ -1783,21 +1763,15 @@ impl Future for Connecting {
17831763
if let Some(remote) = *this.remote_node_id {
17841764
let weak_handle = conn.inner.weak_handle();
17851765
let path_events = conn.inner.path_events();
1786-
this.ep.msock.register_connection(
1787-
remote,
1788-
weak_handle,
1789-
path_events,
1790-
this.paths.clone(),
1791-
);
1766+
this.ep
1767+
.msock
1768+
.register_connection(remote, weak_handle, path_events);
17921769
} else if let Ok(remote) = conn.remote_node_id() {
17931770
let weak_handle = conn.inner.weak_handle();
17941771
let path_events = conn.inner.path_events();
1795-
this.ep.msock.register_connection(
1796-
remote,
1797-
weak_handle,
1798-
path_events,
1799-
this.paths.clone(),
1800-
);
1772+
this.ep
1773+
.msock
1774+
.register_connection(remote, weak_handle, path_events);
18011775
} else {
18021776
warn!("unable to determine node id for the remote");
18031777
}
@@ -2284,6 +2258,7 @@ mod tests {
22842258
};
22852259

22862260
use iroh_base::{NodeAddr, NodeId, SecretKey};
2261+
use iroh_relay::http::Protocol;
22872262
use n0_future::{task::AbortOnDropHandle, StreamExt};
22882263
use n0_snafu::{Error, Result, ResultExt};
22892264
use n0_watcher::Watcher;

iroh/src/magicsock.rs

Lines changed: 22 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@ use netwatch::{ip::LocalAddresses, UdpSocket};
4545
use quinn::{AsyncUdpSocket, ServerConfig, WeakConnectionHandle};
4646
use quinn_proto::PathEvent;
4747
use rand::Rng;
48+
use relay_mapped_addrs::RelayMappedAddresses;
4849
use smallvec::SmallVec;
4950
use snafu::{ResultExt, Snafu};
5051
use tokio::sync::{mpsc, Mutex as AsyncMutex};
@@ -79,6 +80,7 @@ use crate::{
7980

8081
mod metrics;
8182
mod node_map;
83+
mod relay_mapped_addrs;
8284

8385
pub(crate) mod transports;
8486

@@ -204,6 +206,8 @@ pub(crate) struct MagicSock {
204206

205207
/// Tracks the mapped IP addresses
206208
ip_mapped_addrs: IpMappedAddresses,
209+
/// Tracks the mapped IP addresses
210+
relay_mapped_addrs: RelayMappedAddresses,
207211
/// Local addresses
208212
local_addrs_watch: LocalAddrsWatch,
209213
/// Currently bound IP addresses of all sockets
@@ -299,25 +303,10 @@ impl MagicSock {
299303
remote: NodeId,
300304
conn: WeakConnectionHandle,
301305
mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
302-
paths: Vec<SocketAddr>,
303306
) {
304307
self.connection_map.insert(remote, conn);
305-
task::spawn(async move {
306-
let conn = conn.clone();
307-
for addr in paths {
308-
match conn.open_path(addr, quinn_proto::PathStatus::Backup).await {
309-
Ok(path) => {
310-
path.set_max_idle_timeout(Some(ENDPOINTS_FRESH_ENOUGH_DURATION))
311-
.ok();
312-
path.set_keep_alive_interval(Some(HEARTBEAT_INTERVAL)).ok();
313-
}
314-
Err(err) => {
315-
warn!("failed to open path {:?}", err);
316-
}
317-
}
318-
}
319-
});
320308

309+
// TODO: open additional paths
321310
// TODO: track task
322311
// TODO: find a good home for this
323312
task::spawn(async move {
@@ -465,6 +454,11 @@ impl MagicSock {
465454
self.node_map
466455
.add_node_addr(addr.clone(), source, &self.metrics.magicsock);
467456

457+
if let Some(url) = addr.relay_url() {
458+
self.relay_mapped_addrs
459+
.get_or_register(url.clone(), addr.node_id);
460+
}
461+
468462
// Add paths to the existing connections
469463
self.add_paths(addr);
470464

@@ -637,11 +631,8 @@ impl MagicSock {
637631

638632
let mut active_paths = SmallVec::<[_; 3]>::new();
639633

640-
match MappedAddr::from(transmit.destination) {
641-
MappedAddr::None(addr) => {
642-
active_paths.push(transports::Addr::from(addr));
643-
}
644-
MappedAddr::NodeId(dest) => {
634+
match MultipathMappedAddr::from(transmit.destination) {
635+
MultipathMappedAddr::Mixed(dest) => {
645636
trace!(
646637
dst = %dest,
647638
src = ?transmit.src_ip,
@@ -674,7 +665,10 @@ impl MagicSock {
674665
}
675666
}
676667
#[cfg(not(wasm_browser))]
677-
MappedAddr::Ip(dest) => {
668+
MultipathMappedAddr::Ip(addr) => {
669+
active_paths.push(transports::Addr::Ip(addr));
670+
}
671+
MultipathMappedAddr::Relay(dest) => {
678672
trace!(
679673
dst = %dest,
680674
src = ?transmit.src_ip,
@@ -684,9 +678,9 @@ impl MagicSock {
684678

685679
// Check if this is a known IpMappedAddr, and if so, send over UDP
686680
// Get the socket addr
687-
match self.ip_mapped_addrs.get_ip_addr(&dest) {
688-
Some(addr) => {
689-
active_paths.push(transports::Addr::from(addr));
681+
match self.relay_mapped_addrs.get_url(&dest) {
682+
Some((relay, node_id)) => {
683+
active_paths.push(transports::Addr::Relay(relay, node_id));
690684
}
691685
None => {
692686
error!(%dest, "unknown mapped address");
@@ -1107,33 +1101,7 @@ impl From<SocketAddr> for MultipathMappedAddr {
11071101
if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) {
11081102
return Self::Relay(ip_mapped_addr);
11091103
}
1110-
MappedAddr::Self(value)
1111-
}
1112-
}
1113-
}
1114-
}
1115-
1116-
#[derive(Clone, Debug)]
1117-
enum MappedAddr {
1118-
NodeId(NodeIdMappedAddr),
1119-
#[cfg(not(wasm_browser))]
1120-
Ip(IpMappedAddr),
1121-
None(SocketAddr),
1122-
}
1123-
1124-
impl From<SocketAddr> for MappedAddr {
1125-
fn from(value: SocketAddr) -> Self {
1126-
match value.ip() {
1127-
IpAddr::V4(_) => MappedAddr::None(value),
1128-
IpAddr::V6(addr) => {
1129-
if let Ok(node_id_mapped_addr) = NodeIdMappedAddr::try_from(addr) {
1130-
return MappedAddr::NodeId(node_id_mapped_addr);
1131-
}
1132-
#[cfg(not(wasm_browser))]
1133-
if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) {
1134-
return MappedAddr::Ip(ip_mapped_addr);
1135-
}
1136-
MappedAddr::None(value)
1104+
Self::Ip(value)
11371105
}
11381106
}
11391107
}
@@ -1319,6 +1287,7 @@ impl Handle {
13191287
bind_ip(addr_v4, addr_v6, &metrics).context(BindSocketsSnafu)?;
13201288

13211289
let ip_mapped_addrs = IpMappedAddresses::default();
1290+
let relay_mapped_addrs = RelayMappedAddresses::default();
13221291

13231292
let (actor_sender, actor_receiver) = mpsc::channel(256);
13241293

@@ -1366,6 +1335,7 @@ impl Handle {
13661335
node_map,
13671336
connection_map: Default::default(),
13681337
ip_mapped_addrs: ip_mapped_addrs.clone(),
1338+
relay_mapped_addrs,
13691339
discovery,
13701340
discovery_user_data: RwLock::new(discovery_user_data),
13711341
direct_addrs: Default::default(),
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
use std::{collections::BTreeMap, sync::Arc};
2+
3+
use iroh_base::{NodeId, RelayUrl};
4+
use snafu::Snafu;
5+
6+
use crate::net_report::IpMappedAddr;
7+
8+
/// Can occur when converting a [`SocketAddr`] to an [`RelayMappedAddr`]
9+
#[derive(Debug, Snafu)]
10+
#[snafu(display("Failed to convert"))]
11+
pub struct RelayMappedAddrError;
12+
13+
/// A Map of [`RelayMappedAddresses`] to [`SocketAddr`].
14+
#[derive(Debug, Clone, Default)]
15+
pub(crate) struct RelayMappedAddresses(Arc<std::sync::Mutex<Inner>>);
16+
17+
#[derive(Debug, Default)]
18+
pub(super) struct Inner {
19+
by_mapped_addr: BTreeMap<IpMappedAddr, (RelayUrl, NodeId)>,
20+
by_url: BTreeMap<(RelayUrl, NodeId), IpMappedAddr>,
21+
}
22+
23+
impl RelayMappedAddresses {
24+
/// Adds a [`RelayUrl`] to the map and returns the generated [`IpMappedAddr`].
25+
///
26+
/// If this [`RelayUrl`] already exists in the map, it returns its
27+
/// associated [`IpMappedAddr`].
28+
///
29+
/// Otherwise a new [`IpMappedAddr`] is generated for it and returned.
30+
pub(super) fn get_or_register(&self, relay: RelayUrl, node: NodeId) -> IpMappedAddr {
31+
let mut inner = self.0.lock().expect("poisoned");
32+
if let Some(mapped_addr) = inner.by_url.get(&(relay.clone(), node)) {
33+
return *mapped_addr;
34+
}
35+
let ip_mapped_addr = IpMappedAddr::generate();
36+
inner
37+
.by_mapped_addr
38+
.insert(ip_mapped_addr, (relay.clone(), node));
39+
inner.by_url.insert((relay, node), ip_mapped_addr);
40+
ip_mapped_addr
41+
}
42+
43+
/// Returns the [`IpMappedAddr`] for the given [`RelayUrl`].
44+
pub(crate) fn get_mapped_addr(&self, relay: RelayUrl, node: NodeId) -> Option<IpMappedAddr> {
45+
let inner = self.0.lock().expect("poisoned");
46+
inner.by_url.get(&(relay, node)).copied()
47+
}
48+
49+
/// Returns the [`RelayUrl`] for the given [`IpMappedAddr`].
50+
pub(crate) fn get_url(&self, mapped_addr: &IpMappedAddr) -> Option<(RelayUrl, NodeId)> {
51+
let inner = self.0.lock().expect("poisoned");
52+
inner.by_mapped_addr.get(mapped_addr).cloned()
53+
}
54+
}

iroh/src/magicsock/transports/relay.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ impl RelayTransport {
104104
meta_out.len = dm.buf.len();
105105
meta_out.stride = dm.buf.len();
106106
meta_out.ecn = None;
107-
meta_out.dst_ip = None; // TODO: insert the relay url for this relay
107+
meta_out.dst_ip = None;
108108

109109
*addr = (dm.url, dm.src).into();
110110
num_msgs += 1;

iroh/src/net_report/ip_mapped_addrs.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ impl IpMappedAddr {
3838
///
3939
/// This generates a new IPv6 address in the Unique Local Address range (RFC 4193)
4040
/// which is recognised by iroh as an IP mapped address.
41-
pub(super) fn generate() -> Self {
41+
pub(crate) fn generate() -> Self {
4242
let mut addr = [0u8; 16];
4343
addr[0] = Self::ADDR_PREFIXL;
4444
addr[1..6].copy_from_slice(&Self::ADDR_GLOBAL_ID);

0 commit comments

Comments
 (0)