Skip to content

Commit 27700fb

Browse files
start figuring out more details
1 parent fbe285e commit 27700fb

File tree

3 files changed

+126
-43
lines changed

3 files changed

+126
-43
lines changed

iroh/src/endpoint.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -732,14 +732,13 @@ impl Endpoint {
732732
self.add_node_addr(node_addr.clone())?;
733733
}
734734
let node_id = node_addr.node_id;
735-
let direct_addresses = node_addr.direct_addresses.clone();
736735
let relay_url = node_addr.relay_url.clone();
737736

738737
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this
739738
// address. Start discovery for this node if it's enabled and we have no valid or
740739
// verified address information for this node. Dropping the discovery cancels any
741740
// still running task.
742-
let (mapped_addr, _discovery_drop_guard) = self
741+
let (mapped_addr, direct_addresses, _discovery_drop_guard) = self
743742
.get_mapping_addr_and_maybe_start_discovery(node_addr)
744743
.await
745744
.context(NoAddressSnafu)?;
@@ -770,18 +769,26 @@ impl Endpoint {
770769
};
771770

772771
// TODO: race available addresses, this is currently only using the relay addr to connect
773-
let dest_addr = mapped_addr.private_socket_addr();
772+
let dest_addr = if relay_url.is_none() && !direct_addresses.is_empty() {
773+
direct_addresses[0]
774+
} else {
775+
mapped_addr.private_socket_addr()
776+
};
774777
let server_name = &tls::name::encode(node_id);
775778
let connect = self
776779
.msock
777780
.endpoint()
778781
.connect_with(client_config, dest_addr, server_name)
779782
.context(QuinnSnafu)?;
780783

784+
let mut paths = direct_addresses;
785+
paths.push(mapped_addr.private_socket_addr());
786+
781787
Ok(Connecting {
782788
inner: connect,
783789
ep: self.clone(),
784790
remote_node_id: Some(node_id),
791+
paths,
785792
_discovery_drop_guard,
786793
})
787794
}
@@ -1376,18 +1383,20 @@ impl Endpoint {
13761383
async fn get_mapping_addr_and_maybe_start_discovery(
13771384
&self,
13781385
node_addr: NodeAddr,
1379-
) -> Result<(NodeIdMappedAddr, Option<DiscoveryTask>), GetMappingAddressError> {
1386+
) -> Result<(NodeIdMappedAddr, Vec<SocketAddr>, Option<DiscoveryTask>), GetMappingAddressError>
1387+
{
13801388
let node_id = node_addr.node_id;
13811389

13821390
// Only return a mapped addr if we have some way of dialing this node, in other
13831391
// words, we have either a relay URL or at least one direct address.
13841392
let addr = if self.msock.has_send_address(node_id) {
1385-
self.msock.get_mapping_addr(node_id)
1393+
let maddr = self.msock.get_mapping_addr(node_id);
1394+
maddr.map(|maddr| (maddr, self.msock.get_direct_addrs(node_id)))
13861395
} else {
13871396
None
13881397
};
13891398
match addr {
1390-
Some(addr) => {
1399+
Some((maddr, direct)) => {
13911400
// We have some way of dialing this node, but that doesn't actually mean
13921401
// we can actually connect to any of these addresses.
13931402
// Therefore, we will invoke the discovery service if we haven't received from the
@@ -1399,7 +1408,7 @@ impl Endpoint {
13991408
let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay)
14001409
.ok()
14011410
.flatten();
1402-
Ok((addr, discovery))
1411+
Ok((maddr, direct, discovery))
14031412
}
14041413

14051414
None => {
@@ -1414,7 +1423,8 @@ impl Endpoint {
14141423
.await
14151424
.context(get_mapping_address_error::DiscoverSnafu)?;
14161425
if let Some(addr) = self.msock.get_mapping_addr(node_id) {
1417-
Ok((addr, Some(discovery)))
1426+
let direct = self.msock.get_direct_addrs(node_id);
1427+
Ok((addr, direct, Some(discovery)))
14181428
} else {
14191429
Err(get_mapping_address_error::NoAddressSnafu.build())
14201430
}
@@ -1643,6 +1653,8 @@ pub struct Connecting {
16431653
inner: quinn::Connecting,
16441654
ep: Endpoint,
16451655
remote_node_id: Option<NodeId>,
1656+
/// Additional paths to open once a connection is created
1657+
paths: Vec<SocketAddr>,
16461658
/// We run discovery as long as we haven't established a connection yet.
16471659
#[debug("Option<DiscoveryTask>")]
16481660
_discovery_drop_guard: Option<DiscoveryTask>,
@@ -1771,15 +1783,21 @@ impl Future for Connecting {
17711783
if let Some(remote) = *this.remote_node_id {
17721784
let weak_handle = conn.inner.weak_handle();
17731785
let path_events = conn.inner.path_events();
1774-
this.ep
1775-
.msock
1776-
.register_connection(remote, weak_handle, path_events);
1786+
this.ep.msock.register_connection(
1787+
remote,
1788+
weak_handle,
1789+
path_events,
1790+
this.paths.clone(),
1791+
);
17771792
} else if let Ok(remote) = conn.remote_node_id() {
17781793
let weak_handle = conn.inner.weak_handle();
17791794
let path_events = conn.inner.path_events();
1780-
this.ep
1781-
.msock
1782-
.register_connection(remote, weak_handle, path_events);
1795+
this.ep.msock.register_connection(
1796+
remote,
1797+
weak_handle,
1798+
path_events,
1799+
this.paths.clone(),
1800+
);
17831801
} else {
17841802
warn!("unable to determine node id for the remote");
17851803
}

iroh/src/magicsock.rs

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,24 @@ impl MagicSock {
295295
remote: NodeId,
296296
conn: WeakConnectionHandle,
297297
mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
298+
paths: Vec<SocketAddr>,
298299
) {
299300
self.connection_map.insert(remote, conn);
301+
task::spawn(async move {
302+
let conn = conn.clone();
303+
for addr in paths {
304+
match conn.open_path(addr, quinn_proto::PathStatus::Backup).await {
305+
Ok(path) => {
306+
path.set_max_idle_timeout(Some(ENDPOINTS_FRESH_ENOUGH_DURATION))
307+
.ok();
308+
path.set_keep_alive_interval(Some(HEARTBEAT_INTERVAL)).ok();
309+
}
310+
Err(err) => {
311+
warn!("failed to open path {:?}", err);
312+
}
313+
}
314+
}
315+
});
300316

301317
// TODO: track task
302318
// TODO: find a good home for this
@@ -422,6 +438,10 @@ impl MagicSock {
422438
self.node_map.get_quic_mapped_addr_for_node_key(node_id)
423439
}
424440

441+
pub(crate) fn get_direct_addrs(&self, node_id: NodeId) -> Vec<SocketAddr> {
442+
self.node_map.get_direct_addrs(node_id)
443+
}
444+
425445
/// Add addresses for a node to the magic socket's addresbook.
426446
#[instrument(skip_all)]
427447
pub fn add_node_addr(
@@ -1055,6 +1075,40 @@ impl MagicSock {
10551075
}
10561076
}
10571077

1078+
/// Definies the translation of addresses in quinn land vs iroh land.
1079+
///
1080+
/// This is necessary, because quinn can only reason about `SocketAddr`s.
1081+
#[derive(Clone, Debug)]
1082+
pub(crate) enum MultipathMappedAddr {
1083+
/// Used for the initial connection.
1084+
/// - Only used for sending
1085+
/// - This means send on all known paths/transports
1086+
Mixed(NodeIdMappedAddr),
1087+
/// Relay based transport address
1088+
Relay(IpMappedAddr), // TODO: RelayMappedAddr?
1089+
/// IP based transport address
1090+
#[cfg(not(wasm_browser))]
1091+
Ip(SocketAddr),
1092+
}
1093+
1094+
impl From<SocketAddr> for MultipathMappedAddr {
1095+
fn from(value: SocketAddr) -> Self {
1096+
match value.ip() {
1097+
IpAddr::V4(_) => Self::Ip(value),
1098+
IpAddr::V6(addr) => {
1099+
if let Ok(node_id_mapped_addr) = NodeIdMappedAddr::try_from(addr) {
1100+
return Self::Mixed(node_id_mapped_addr);
1101+
}
1102+
#[cfg(not(wasm_browser))]
1103+
if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) {
1104+
return Self::Relay(ip_mapped_addr);
1105+
}
1106+
MappedAddr::Self(value)
1107+
}
1108+
}
1109+
}
1110+
}
1111+
10581112
#[derive(Clone, Debug)]
10591113
enum MappedAddr {
10601114
NodeId(NodeIdMappedAddr),
@@ -3187,17 +3241,18 @@ mod tests {
31873241
let _accept_task = AbortOnDropHandle::new(accept_task);
31883242

31893243
// Add an empty entry in the NodeMap of ep_1
3190-
msock_1.node_map.add_node_addr(
3191-
NodeAddr {
3192-
node_id: node_id_2,
3193-
relay_url: None,
3194-
direct_addresses: Default::default(),
3195-
},
3196-
Source::NamedApp {
3197-
name: "test".into(),
3198-
},
3199-
&msock_1.metrics.magicsock,
3200-
);
3244+
msock_1
3245+
.add_node_addr(
3246+
NodeAddr {
3247+
node_id: node_id_2,
3248+
relay_url: None,
3249+
direct_addresses: Default::default(),
3250+
},
3251+
Source::NamedApp {
3252+
name: "test".into(),
3253+
},
3254+
)
3255+
.unwrap();
32013256
let addr_2 = msock_1.get_mapping_addr(node_id_2).unwrap();
32023257

32033258
// Set a low max_idle_timeout so quinn gives up on this quickly and our test does
@@ -3224,24 +3279,25 @@ mod tests {
32243279
info!("first connect timed out as expected");
32253280

32263281
// Provide correct addressing information
3227-
msock_1.node_map.add_node_addr(
3228-
NodeAddr {
3229-
node_id: node_id_2,
3230-
relay_url: None,
3231-
direct_addresses: msock_2
3232-
.direct_addresses()
3233-
.initialized()
3234-
.await
3235-
.expect("no direct addrs")
3236-
.into_iter()
3237-
.map(|x| x.addr)
3238-
.collect(),
3239-
},
3240-
Source::NamedApp {
3241-
name: "test".into(),
3242-
},
3243-
&msock_1.metrics.magicsock,
3244-
);
3282+
msock_1
3283+
.add_node_addr(
3284+
NodeAddr {
3285+
node_id: node_id_2,
3286+
relay_url: None,
3287+
direct_addresses: msock_2
3288+
.direct_addresses()
3289+
.initialized()
3290+
.await
3291+
.expect("no direct addrs")
3292+
.into_iter()
3293+
.map(|x| x.addr)
3294+
.collect(),
3295+
},
3296+
Source::NamedApp {
3297+
name: "test".into(),
3298+
},
3299+
)
3300+
.unwrap();
32453301

32463302
// We can now connect
32473303
tokio::time::timeout(Duration::from_secs(10), async move {

iroh/src/magicsock/node_map.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ impl NodeMap {
180180
.map(|ep| *ep.quic_mapped_addr())
181181
}
182182

183+
pub(super) fn get_direct_addrs(&self, node_key: NodeId) -> Vec<SocketAddr> {
184+
self.inner
185+
.lock()
186+
.expect("poisoned")
187+
.get(NodeStateKey::NodeId(node_key))
188+
.map(|ep| ep.direct_addresses().map(Into::into).collect())
189+
.unwrap_or_default()
190+
}
191+
183192
pub(super) fn handle_call_me_maybe(
184193
&self,
185194
sender: PublicKey,

0 commit comments

Comments
 (0)