Skip to content

Commit 2d8f739

Browse files
start opening paths
1 parent 7e40d68 commit 2d8f739

File tree

2 files changed

+93
-24
lines changed

2 files changed

+93
-24
lines changed

iroh/src/endpoint.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1768,6 +1768,17 @@ impl Future for Connecting {
17681768
Poll::Ready(Err(err)) => Poll::Ready(Err(err)),
17691769
Poll::Ready(Ok(inner)) => {
17701770
let conn = Connection { inner };
1771+
1772+
// Grab the remote identity and register this connection
1773+
1774+
if let Some(remote) = *this.remote_node_id {
1775+
let weak_handle = conn.inner.weak_handle();
1776+
this.ep.msock.register_connection(remote, weak_handle);
1777+
} else if let Ok(remote) = conn.remote_node_id() {
1778+
let weak_handle = conn.inner.weak_handle();
1779+
this.ep.msock.register_connection(remote, weak_handle);
1780+
}
1781+
17711782
try_send_rtt_msg(&conn, this.ep, *this.remote_node_id);
17721783
Poll::Ready(Ok(conn))
17731784
}

iroh/src/magicsock.rs

Lines changed: 82 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ use std::{
2222
net::{IpAddr, Ipv4Addr, Ipv6Addr, SocketAddr, SocketAddrV4, SocketAddrV6},
2323
pin::Pin,
2424
sync::{
25-
Arc, Mutex, RwLock,
2625
atomic::{AtomicBool, AtomicU64, Ordering},
26+
Arc, Mutex, RwLock,
2727
},
2828
task::{Context, Poll},
2929
};
@@ -33,24 +33,24 @@ use data_encoding::HEXLOWER;
3333
use iroh_base::{NodeAddr, NodeId, PublicKey, RelayUrl, SecretKey};
3434
use iroh_relay::RelayMap;
3535
use n0_future::{
36-
StreamExt,
3736
boxed::BoxStream,
3837
task::{self, AbortOnDropHandle},
3938
time::{self, Duration, Instant},
39+
StreamExt,
4040
};
4141
use n0_watcher::{self, Watchable, Watcher};
4242
use nested_enum_utils::common_fields;
4343
use netwatch::netmon;
4444
#[cfg(not(wasm_browser))]
45-
use netwatch::{UdpSocket, ip::LocalAddresses};
46-
use quinn::{AsyncUdpSocket, ServerConfig};
45+
use netwatch::{ip::LocalAddresses, UdpSocket};
46+
use quinn::{AsyncUdpSocket, ServerConfig, WeakConnectionHandle};
4747
use rand::Rng;
4848
use smallvec::SmallVec;
4949
use snafu::{ResultExt, Snafu};
50-
use tokio::sync::{Mutex as AsyncMutex, mpsc};
50+
use tokio::sync::{mpsc, Mutex as AsyncMutex};
5151
use tokio_util::sync::CancellationToken;
5252
use tracing::{
53-
Instrument, Level, debug, error, event, info, info_span, instrument, trace, trace_span, warn,
53+
debug, error, event, info, info_span, instrument, trace, trace_span, warn, Instrument, Level,
5454
};
5555
use transports::LocalAddrsWatch;
5656
use url::Url;
@@ -72,7 +72,7 @@ use crate::{
7272
defaults::timeouts::NET_REPORT_TIMEOUT,
7373
disco::{self, SendAddr},
7474
discovery::{Discovery, DiscoveryItem, DiscoverySubscribers, NodeData, UserData},
75-
key::{DecryptionError, SharedSecret, public_ed_box, secret_ed_box},
75+
key::{public_ed_box, secret_ed_box, DecryptionError, SharedSecret},
7676
metrics::EndpointMetrics,
7777
net_report::{self, IfStateDetails, IpMappedAddresses, Report},
7878
};
@@ -199,6 +199,9 @@ pub(crate) struct MagicSock {
199199
ipv6_reported: Arc<AtomicBool>,
200200
/// Tracks the networkmap node entity for each node discovery key.
201201
node_map: NodeMap,
202+
/// Tracks existing connections
203+
connection_map: ConnectionMap,
204+
202205
/// Tracks the mapped IP addresses
203206
ip_mapped_addrs: IpMappedAddresses,
204207
/// Local addresses
@@ -225,6 +228,22 @@ pub(crate) struct MagicSock {
225228
pub(crate) metrics: EndpointMetrics,
226229
}
227230

231+
#[derive(Default, Debug)]
232+
struct ConnectionMap {
233+
map: std::sync::Mutex<BTreeMap<NodeId, Vec<WeakConnectionHandle>>>,
234+
}
235+
236+
impl ConnectionMap {
237+
fn insert(&self, remote: NodeId, handle: WeakConnectionHandle) {
238+
self.map
239+
.lock()
240+
.expect("poisoned")
241+
.entry(remote)
242+
.or_default()
243+
.push(handle);
244+
}
245+
}
246+
228247
#[allow(missing_docs)]
229248
#[common_fields({
230249
backtrace: Option<snafu::Backtrace>,
@@ -275,6 +294,10 @@ impl MagicSock {
275294
self.local_addrs_watch.get().expect("disconnected")
276295
}
277296

297+
pub(crate) fn register_connection(&self, remote: NodeId, conn: WeakConnectionHandle) {
298+
self.connection_map.insert(remote, conn);
299+
}
300+
278301
#[cfg(not(wasm_browser))]
279302
fn ip_bind_addrs(&self) -> &[SocketAddr] {
280303
&self.ip_bind_addrs
@@ -397,8 +420,45 @@ impl MagicSock {
397420
}
398421
}
399422
if !addr.is_empty() {
423+
// Add addr to the internal NodeMap
400424
self.node_map
401-
.add_node_addr(addr, source, &self.metrics.magicsock);
425+
.add_node_addr(addr.clone(), source, &self.metrics.magicsock);
426+
427+
// Add paths to the existing connections
428+
{
429+
let mut map = self.connection_map.map.lock().expect("poisoned");
430+
let mut to_delete = Vec::new();
431+
if let Some(conns) = map.get_mut(&addr.node_id) {
432+
for (i, conn) in conns.into_iter().enumerate() {
433+
if let Some(conn) = conn.upgrade() {
434+
for addr in addr.direct_addresses() {
435+
let conn = conn.clone();
436+
let addr = *addr;
437+
task::spawn(async move {
438+
if let Err(err) = conn
439+
.open_path(addr, quinn_proto::PathStatus::Available)
440+
.await
441+
{
442+
warn!("failed to open path {:?}", err);
443+
}
444+
});
445+
}
446+
// TODO: add relay path as mapped addr
447+
} else {
448+
to_delete.push(i);
449+
}
450+
}
451+
// cleanup dead connections
452+
let mut i = 0;
453+
conns.retain(|_| {
454+
let remove = to_delete.contains(&i);
455+
i += 1;
456+
457+
!remove
458+
});
459+
}
460+
}
461+
402462
Ok(())
403463
} else if pruned != 0 {
404464
Err(EmptyPrunedSnafu { pruned }.build())
@@ -509,8 +569,8 @@ impl MagicSock {
509569
let mut active_paths = SmallVec::<[_; 3]>::new();
510570

511571
match MappedAddr::from(transmit.destination) {
512-
MappedAddr::None(dest) => {
513-
error!(%dest, "Cannot convert to a mapped address.");
572+
MappedAddr::None(addr) => {
573+
active_paths.push(transports::Addr::from(addr));
514574
}
515575
MappedAddr::NodeId(dest) => {
516576
trace!(
@@ -527,15 +587,14 @@ impl MagicSock {
527587
self.ipv6_reported.load(Ordering::Relaxed),
528588
&self.metrics.magicsock,
529589
) {
530-
Some((node_id, udp_addr, relay_url, ping_actions)) => {
590+
Some((node_id, _udp_addr, relay_url, ping_actions)) => {
531591
if !ping_actions.is_empty() {
532592
self.actor_sender
533593
.try_send(ActorMessage::PingActions(ping_actions))
534594
.ok();
535595
}
536-
if let Some(addr) = udp_addr {
537-
active_paths.push(transports::Addr::from(addr));
538-
}
596+
// NodeId mapped addrs are only used for relays, currently.
597+
// IP based addrs will have been added as individual paths
539598
if let Some(url) = relay_url {
540599
active_paths.push(transports::Addr::Relay(url, node_id));
541600
}
@@ -1297,6 +1356,7 @@ impl Handle {
12971356
actor_sender: actor_sender.clone(),
12981357
ipv6_reported,
12991358
node_map,
1359+
connection_map: Default::default(),
13001360
ip_mapped_addrs: ip_mapped_addrs.clone(),
13011361
discovery,
13021362
discovery_user_data: RwLock::new(discovery_user_data),
@@ -2415,23 +2475,22 @@ mod tests {
24152475

24162476
use data_encoding::HEXLOWER;
24172477
use iroh_base::{NodeAddr, NodeId, PublicKey};
2418-
use n0_future::{StreamExt, time};
2478+
use n0_future::{time, StreamExt};
24192479
use n0_snafu::{Result, ResultExt};
24202480
use n0_watcher::Watcher;
24212481
use quinn::ServerConfig;
24222482
use rand::{Rng, RngCore};
24232483
use tokio::task::JoinSet;
24242484
use tokio_util::task::AbortOnDropHandle;
2425-
use tracing::{Instrument, error, info, info_span, instrument};
2485+
use tracing::{error, info, info_span, instrument, Instrument};
24262486
use tracing_test::traced_test;
24272487

24282488
use super::{NodeIdMappedAddr, Options};
24292489
use crate::{
2430-
Endpoint, RelayMap, RelayMode, SecretKey,
24312490
dns::DnsResolver,
24322491
endpoint::{DirectAddr, PathSelection, Source},
2433-
magicsock::{Handle, MagicSock, node_map},
2434-
tls,
2492+
magicsock::{node_map, Handle, MagicSock},
2493+
tls, Endpoint, RelayMap, RelayMode, SecretKey,
24352494
};
24362495

24372496
const ALPN: &[u8] = b"n0/test/1";
@@ -3277,11 +3336,10 @@ mod tests {
32773336
.magic_sock()
32783337
.add_node_addr(empty_addr, node_map::Source::App)
32793338
.unwrap_err();
3280-
assert!(
3281-
err.to_string()
3282-
.to_lowercase()
3283-
.contains("empty addressing info")
3284-
);
3339+
assert!(err
3340+
.to_string()
3341+
.to_lowercase()
3342+
.contains("empty addressing info"));
32853343

32863344
// relay url only
32873345
let addr = NodeAddr {

0 commit comments

Comments
 (0)