Skip to content

Commit bbccf98

Browse files
start figuring out more details
1 parent 27fc380 commit bbccf98

File tree

3 files changed

+126
-43
lines changed

3 files changed

+126
-43
lines changed

iroh/src/endpoint.rs

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -732,14 +732,13 @@ impl Endpoint {
732732
self.add_node_addr(node_addr.clone())?;
733733
}
734734
let node_id = node_addr.node_id;
735-
let direct_addresses = node_addr.direct_addresses.clone();
736735
let relay_url = node_addr.relay_url.clone();
737736

738737
// Get the mapped IPv6 address from the magic socket. Quinn will connect to this
739738
// address. Start discovery for this node if it's enabled and we have no valid or
740739
// verified address information for this node. Dropping the discovery cancels any
741740
// still running task.
742-
let (mapped_addr, _discovery_drop_guard) = self
741+
let (mapped_addr, direct_addresses, _discovery_drop_guard) = self
743742
.get_mapping_addr_and_maybe_start_discovery(node_addr)
744743
.await
745744
.context(NoAddressSnafu)?;
@@ -770,18 +769,26 @@ impl Endpoint {
770769
};
771770

772771
// TODO: race available addresses, this is currently only using the relay addr to connect
773-
let dest_addr = mapped_addr.private_socket_addr();
772+
let dest_addr = if relay_url.is_none() && !direct_addresses.is_empty() {
773+
direct_addresses[0]
774+
} else {
775+
mapped_addr.private_socket_addr()
776+
};
774777
let server_name = &tls::name::encode(node_id);
775778
let connect = self
776779
.msock
777780
.endpoint()
778781
.connect_with(client_config, dest_addr, server_name)
779782
.context(QuinnSnafu)?;
780783

784+
let mut paths = direct_addresses;
785+
paths.push(mapped_addr.private_socket_addr());
786+
781787
Ok(Connecting {
782788
inner: connect,
783789
ep: self.clone(),
784790
remote_node_id: Some(node_id),
791+
paths,
785792
_discovery_drop_guard,
786793
})
787794
}
@@ -1376,18 +1383,20 @@ impl Endpoint {
13761383
async fn get_mapping_addr_and_maybe_start_discovery(
13771384
&self,
13781385
node_addr: NodeAddr,
1379-
) -> Result<(NodeIdMappedAddr, Option<DiscoveryTask>), GetMappingAddressError> {
1386+
) -> Result<(NodeIdMappedAddr, Vec<SocketAddr>, Option<DiscoveryTask>), GetMappingAddressError>
1387+
{
13801388
let node_id = node_addr.node_id;
13811389

13821390
// Only return a mapped addr if we have some way of dialing this node, in other
13831391
// words, we have either a relay URL or at least one direct address.
13841392
let addr = if self.msock.has_send_address(node_id) {
1385-
self.msock.get_mapping_addr(node_id)
1393+
let maddr = self.msock.get_mapping_addr(node_id);
1394+
maddr.map(|maddr| (maddr, self.msock.get_direct_addrs(node_id)))
13861395
} else {
13871396
None
13881397
};
13891398
match addr {
1390-
Some(addr) => {
1399+
Some((maddr, direct)) => {
13911400
// We have some way of dialing this node, but that doesn't actually mean
13921401
// we can actually connect to any of these addresses.
13931402
// Therefore, we will invoke the discovery service if we haven't received from the
@@ -1399,7 +1408,7 @@ impl Endpoint {
13991408
let discovery = DiscoveryTask::maybe_start_after_delay(self, node_id, delay)
14001409
.ok()
14011410
.flatten();
1402-
Ok((addr, discovery))
1411+
Ok((maddr, direct, discovery))
14031412
}
14041413

14051414
None => {
@@ -1414,7 +1423,8 @@ impl Endpoint {
14141423
.await
14151424
.context(get_mapping_address_error::DiscoverSnafu)?;
14161425
if let Some(addr) = self.msock.get_mapping_addr(node_id) {
1417-
Ok((addr, Some(discovery)))
1426+
let direct = self.msock.get_direct_addrs(node_id);
1427+
Ok((addr, direct, Some(discovery)))
14181428
} else {
14191429
Err(get_mapping_address_error::NoAddressSnafu.build())
14201430
}
@@ -1643,6 +1653,8 @@ pub struct Connecting {
16431653
inner: quinn::Connecting,
16441654
ep: Endpoint,
16451655
remote_node_id: Option<NodeId>,
1656+
/// Additional paths to open once a connection is created
1657+
paths: Vec<SocketAddr>,
16461658
/// We run discovery as long as we haven't established a connection yet.
16471659
#[debug("Option<DiscoveryTask>")]
16481660
_discovery_drop_guard: Option<DiscoveryTask>,
@@ -1771,15 +1783,21 @@ impl Future for Connecting {
17711783
if let Some(remote) = *this.remote_node_id {
17721784
let weak_handle = conn.inner.weak_handle();
17731785
let path_events = conn.inner.path_events();
1774-
this.ep
1775-
.msock
1776-
.register_connection(remote, weak_handle, path_events);
1786+
this.ep.msock.register_connection(
1787+
remote,
1788+
weak_handle,
1789+
path_events,
1790+
this.paths.clone(),
1791+
);
17771792
} else if let Ok(remote) = conn.remote_node_id() {
17781793
let weak_handle = conn.inner.weak_handle();
17791794
let path_events = conn.inner.path_events();
1780-
this.ep
1781-
.msock
1782-
.register_connection(remote, weak_handle, path_events);
1795+
this.ep.msock.register_connection(
1796+
remote,
1797+
weak_handle,
1798+
path_events,
1799+
this.paths.clone(),
1800+
);
17831801
} else {
17841802
warn!("unable to determine node id for the remote");
17851803
}

iroh/src/magicsock.rs

Lines changed: 85 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -299,8 +299,24 @@ impl MagicSock {
299299
remote: NodeId,
300300
conn: WeakConnectionHandle,
301301
mut path_events: tokio::sync::broadcast::Receiver<PathEvent>,
302+
paths: Vec<SocketAddr>,
302303
) {
303304
self.connection_map.insert(remote, conn);
305+
task::spawn(async move {
306+
let conn = conn.clone();
307+
for addr in paths {
308+
match conn.open_path(addr, quinn_proto::PathStatus::Backup).await {
309+
Ok(path) => {
310+
path.set_max_idle_timeout(Some(ENDPOINTS_FRESH_ENOUGH_DURATION))
311+
.ok();
312+
path.set_keep_alive_interval(Some(HEARTBEAT_INTERVAL)).ok();
313+
}
314+
Err(err) => {
315+
warn!("failed to open path {:?}", err);
316+
}
317+
}
318+
}
319+
});
304320

305321
// TODO: track task
306322
// TODO: find a good home for this
@@ -426,6 +442,10 @@ impl MagicSock {
426442
self.node_map.get_quic_mapped_addr_for_node_key(node_id)
427443
}
428444

445+
pub(crate) fn get_direct_addrs(&self, node_id: NodeId) -> Vec<SocketAddr> {
446+
self.node_map.get_direct_addrs(node_id)
447+
}
448+
429449
/// Add addresses for a node to the magic socket's addresbook.
430450
#[instrument(skip_all)]
431451
pub fn add_node_addr(
@@ -1059,6 +1079,40 @@ impl MagicSock {
10591079
}
10601080
}
10611081

1082+
/// Definies the translation of addresses in quinn land vs iroh land.
1083+
///
1084+
/// This is necessary, because quinn can only reason about `SocketAddr`s.
1085+
#[derive(Clone, Debug)]
1086+
pub(crate) enum MultipathMappedAddr {
1087+
/// Used for the initial connection.
1088+
/// - Only used for sending
1089+
/// - This means send on all known paths/transports
1090+
Mixed(NodeIdMappedAddr),
1091+
/// Relay based transport address
1092+
Relay(IpMappedAddr), // TODO: RelayMappedAddr?
1093+
/// IP based transport address
1094+
#[cfg(not(wasm_browser))]
1095+
Ip(SocketAddr),
1096+
}
1097+
1098+
impl From<SocketAddr> for MultipathMappedAddr {
1099+
fn from(value: SocketAddr) -> Self {
1100+
match value.ip() {
1101+
IpAddr::V4(_) => Self::Ip(value),
1102+
IpAddr::V6(addr) => {
1103+
if let Ok(node_id_mapped_addr) = NodeIdMappedAddr::try_from(addr) {
1104+
return Self::Mixed(node_id_mapped_addr);
1105+
}
1106+
#[cfg(not(wasm_browser))]
1107+
if let Ok(ip_mapped_addr) = IpMappedAddr::try_from(addr) {
1108+
return Self::Relay(ip_mapped_addr);
1109+
}
1110+
MappedAddr::Self(value)
1111+
}
1112+
}
1113+
}
1114+
}
1115+
10621116
#[derive(Clone, Debug)]
10631117
enum MappedAddr {
10641118
NodeId(NodeIdMappedAddr),
@@ -3191,17 +3245,18 @@ mod tests {
31913245
let _accept_task = AbortOnDropHandle::new(accept_task);
31923246

31933247
// Add an empty entry in the NodeMap of ep_1
3194-
msock_1.node_map.add_node_addr(
3195-
NodeAddr {
3196-
node_id: node_id_2,
3197-
relay_url: None,
3198-
direct_addresses: Default::default(),
3199-
},
3200-
Source::NamedApp {
3201-
name: "test".into(),
3202-
},
3203-
&msock_1.metrics.magicsock,
3204-
);
3248+
msock_1
3249+
.add_node_addr(
3250+
NodeAddr {
3251+
node_id: node_id_2,
3252+
relay_url: None,
3253+
direct_addresses: Default::default(),
3254+
},
3255+
Source::NamedApp {
3256+
name: "test".into(),
3257+
},
3258+
)
3259+
.unwrap();
32053260
let addr_2 = msock_1.get_mapping_addr(node_id_2).unwrap();
32063261

32073262
// Set a low max_idle_timeout so quinn gives up on this quickly and our test does
@@ -3228,24 +3283,25 @@ mod tests {
32283283
info!("first connect timed out as expected");
32293284

32303285
// Provide correct addressing information
3231-
msock_1.node_map.add_node_addr(
3232-
NodeAddr {
3233-
node_id: node_id_2,
3234-
relay_url: None,
3235-
direct_addresses: msock_2
3236-
.direct_addresses()
3237-
.initialized()
3238-
.await
3239-
.expect("no direct addrs")
3240-
.into_iter()
3241-
.map(|x| x.addr)
3242-
.collect(),
3243-
},
3244-
Source::NamedApp {
3245-
name: "test".into(),
3246-
},
3247-
&msock_1.metrics.magicsock,
3248-
);
3286+
msock_1
3287+
.add_node_addr(
3288+
NodeAddr {
3289+
node_id: node_id_2,
3290+
relay_url: None,
3291+
direct_addresses: msock_2
3292+
.direct_addresses()
3293+
.initialized()
3294+
.await
3295+
.expect("no direct addrs")
3296+
.into_iter()
3297+
.map(|x| x.addr)
3298+
.collect(),
3299+
},
3300+
Source::NamedApp {
3301+
name: "test".into(),
3302+
},
3303+
)
3304+
.unwrap();
32493305

32503306
// We can now connect
32513307
tokio::time::timeout(Duration::from_secs(10), async move {

iroh/src/magicsock/node_map.rs

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -180,6 +180,15 @@ impl NodeMap {
180180
.map(|ep| *ep.quic_mapped_addr())
181181
}
182182

183+
pub(super) fn get_direct_addrs(&self, node_key: NodeId) -> Vec<SocketAddr> {
184+
self.inner
185+
.lock()
186+
.expect("poisoned")
187+
.get(NodeStateKey::NodeId(node_key))
188+
.map(|ep| ep.direct_addresses().map(Into::into).collect())
189+
.unwrap_or_default()
190+
}
191+
183192
pub(super) fn handle_call_me_maybe(
184193
&self,
185194
sender: PublicKey,

0 commit comments

Comments
 (0)