Skip to content

Commit c8265ee

Browse files
committed
feat: add connection manager and use in iroh-gossip
1 parent d635d93 commit c8265ee

File tree

7 files changed

+614
-82
lines changed

7 files changed

+614
-82
lines changed

Cargo.lock

Lines changed: 2 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

iroh-gossip/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ iroh-base = { version = "0.16.0", path = "../iroh-base" }
3232

3333
# net dependencies (optional)
3434
futures-lite = { version = "2.3", optional = true }
35+
futures-util = { version = "0.3.30", optional = true }
3536
iroh-net = { path = "../iroh-net", version = "0.16.0", optional = true, default-features = false, features = ["test-utils"] }
3637
tokio = { version = "1", optional = true, features = ["io-util", "sync", "rt", "macros", "net", "fs"] }
3738
tokio-util = { version = "0.7.8", optional = true, features = ["codec"] }
@@ -46,7 +47,7 @@ url = "2.4.0"
4647

4748
[features]
4849
default = ["net"]
49-
net = ["dep:futures-lite", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
50+
net = ["dep:futures-lite", "dep:futures-util", "dep:iroh-net", "dep:tokio", "dep:tokio-util"]
5051

5152
[[example]]
5253
name = "chat"

iroh-gossip/src/net.rs

Lines changed: 93 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,12 @@
22
33
use anyhow::{anyhow, Context};
44
use bytes::{Bytes, BytesMut};
5-
use futures_lite::stream::Stream;
5+
use futures_lite::{stream::Stream, StreamExt};
6+
use futures_util::future::FutureExt;
67
use genawaiter::sync::{Co, Gen};
78
use iroh_net::{
8-
dialer::Dialer,
9-
endpoint::{get_remote_node_id, Connection},
9+
conn_manager::{ConnDirection, ConnInfo, ConnManager},
10+
endpoint::Connection,
1011
key::PublicKey,
1112
AddrInfo, Endpoint, NodeAddr,
1213
};
@@ -15,7 +16,7 @@ use rand_core::SeedableRng;
1516
use std::{collections::HashMap, future::Future, pin::Pin, sync::Arc, task::Poll, time::Instant};
1617
use tokio::{
1718
sync::{broadcast, mpsc, oneshot},
18-
task::JoinHandle,
19+
task::{JoinHandle, JoinSet},
1920
};
2021
use tracing::{debug, error_span, trace, warn, Instrument};
2122

@@ -82,7 +83,7 @@ impl Gossip {
8283
/// Spawn a gossip actor and get a handle for it
8384
pub fn from_endpoint(endpoint: Endpoint, config: proto::Config, my_addr: &AddrInfo) -> Self {
8485
let peer_id = endpoint.node_id();
85-
let dialer = Dialer::new(endpoint.clone());
86+
let conn_manager = ConnManager::new(endpoint.clone(), GOSSIP_ALPN);
8687
let state = proto::State::new(
8788
peer_id,
8889
encode_peer_data(my_addr).unwrap(),
@@ -97,12 +98,12 @@ impl Gossip {
9798
let actor = Actor {
9899
endpoint,
99100
state,
100-
dialer,
101+
conn_manager,
102+
conn_tasks: Default::default(),
101103
to_actor_rx,
102104
in_event_rx,
103105
in_event_tx,
104106
on_endpoints_rx,
105-
conns: Default::default(),
106107
conn_send_tx: Default::default(),
107108
pending_sends: Default::default(),
108109
timers: Timers::new(),
@@ -231,9 +232,7 @@ impl Gossip {
231232
///
232233
/// Make sure to check the ALPN protocol yourself before passing the connection.
233234
pub async fn handle_connection(&self, conn: Connection) -> anyhow::Result<()> {
234-
let peer_id = get_remote_node_id(&conn)?;
235-
self.send(ToActor::ConnIncoming(peer_id, ConnOrigin::Accept, conn))
236-
.await?;
235+
self.send(ToActor::ConnIncoming(conn)).await?;
237236
Ok(())
238237
}
239238

@@ -283,19 +282,11 @@ impl Future for JoinTopicFut {
283282
}
284283
}
285284

286-
/// Whether a connection is initiated by us (Dial) or by the remote peer (Accept)
287-
#[derive(Debug)]
288-
enum ConnOrigin {
289-
Accept,
290-
Dial,
291-
}
292-
293285
/// Input messages for the gossip [`Actor`].
294286
#[derive(derive_more::Debug)]
295287
enum ToActor {
296-
/// Handle a new QUIC connection, either from accept (external to the actor) or from connect
297-
/// (happens internally in the actor).
298-
ConnIncoming(PublicKey, ConnOrigin, #[debug(skip)] Connection),
288+
/// Handle a new incoming QUIC connection.
289+
ConnIncoming(iroh_net::endpoint::Connection),
299290
/// Join a topic with a list of peers. Reply with oneshot once at least one peer joined.
300291
Join(
301292
TopicId,
@@ -329,8 +320,8 @@ struct Actor {
329320
/// Protocol state
330321
state: proto::State<PublicKey, StdRng>,
331322
endpoint: Endpoint,
332-
/// Dial machine to connect to peers
333-
dialer: Dialer,
323+
/// Connection manager to dial and accept connections.
324+
conn_manager: ConnManager,
334325
/// Input messages to the actor
335326
to_actor_rx: mpsc::Receiver<ToActor>,
336327
/// Sender for the state input (cloned into the connection loops)
@@ -341,10 +332,10 @@ struct Actor {
341332
on_endpoints_rx: mpsc::Receiver<Vec<iroh_net::config::Endpoint>>,
342333
/// Queued timers
343334
timers: Timers<Timer>,
344-
/// Currently opened quinn connections to peers
345-
conns: HashMap<PublicKey, Connection>,
346335
/// Channels to send outbound messages into the connection loops
347336
conn_send_tx: HashMap<PublicKey, mpsc::Sender<ProtoMessage>>,
337+
/// Connection loop tasks
338+
conn_tasks: JoinSet<(PublicKey, anyhow::Result<()>)>,
348339
/// Queued messages that were to be sent before a dial completed
349340
pending_sends: HashMap<PublicKey, Vec<ProtoMessage>>,
350341
/// Broadcast senders for active topic subscriptions from the application
@@ -353,6 +344,12 @@ struct Actor {
353344
subscribers_all: Option<broadcast::Sender<(TopicId, Event)>>,
354345
}
355346

347+
impl Drop for Actor {
348+
fn drop(&mut self) {
349+
self.conn_tasks.abort_all();
350+
}
351+
}
352+
356353
impl Actor {
357354
pub async fn run(mut self) -> anyhow::Result<()> {
358355
let mut i = 0;
@@ -384,15 +381,27 @@ impl Actor {
384381
}
385382
}
386383
}
387-
(peer_id, res) = self.dialer.next_conn() => {
388-
trace!(?i, "tick: dialer");
384+
Some(res) = self.conn_manager.next() => {
385+
trace!(?i, "tick: conn_manager");
389386
match res {
390-
Ok(conn) => {
391-
debug!(peer = ?peer_id, "dial successful");
392-
self.handle_to_actor_msg(ToActor::ConnIncoming(peer_id, ConnOrigin::Dial, conn), Instant::now()).await.context("dialer.next -> conn -> handle_to_actor_msg")?;
393-
}
387+
Ok(conn) => self.handle_new_connection(conn).await,
394388
Err(err) => {
395-
warn!(peer = ?peer_id, "dial failed: {err}");
389+
self.handle_in_event(InEvent::PeerDisconnected(err.node_id), Instant::now()).await?;
390+
}
391+
}
392+
}
393+
Some(res) = self.conn_tasks.join_next(), if !self.conn_tasks.is_empty() => {
394+
match res {
395+
Err(err) if !err.is_cancelled() => warn!(?err, "connection loop panicked"),
396+
Err(_err) => {},
397+
Ok((node_id, result)) => {
398+
self.conn_manager.remove(&node_id);
399+
self.conn_send_tx.remove(&node_id);
400+
self.handle_in_event(InEvent::PeerDisconnected(node_id), Instant::now()).await?;
401+
match result {
402+
Ok(()) => debug!(peer=%node_id.fmt_short(), "connection closed without error"),
403+
Err(err) => debug!(peer=%node_id.fmt_short(), "connection closed with error {err:?}"),
404+
}
396405
}
397406
}
398407
}
@@ -421,38 +430,9 @@ impl Actor {
421430
async fn handle_to_actor_msg(&mut self, msg: ToActor, now: Instant) -> anyhow::Result<()> {
422431
trace!("handle to_actor {msg:?}");
423432
match msg {
424-
ToActor::ConnIncoming(peer_id, origin, conn) => {
425-
self.conns.insert(peer_id, conn.clone());
426-
self.dialer.abort_dial(&peer_id);
427-
let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
428-
self.conn_send_tx.insert(peer_id, send_tx.clone());
429-
430-
// Spawn a task for this connection
431-
let in_event_tx = self.in_event_tx.clone();
432-
tokio::spawn(
433-
async move {
434-
debug!("connection established");
435-
match connection_loop(peer_id, conn, origin, send_rx, &in_event_tx).await {
436-
Ok(()) => {
437-
debug!("connection closed without error")
438-
}
439-
Err(err) => {
440-
debug!("connection closed with error {err:?}")
441-
}
442-
}
443-
in_event_tx
444-
.send(InEvent::PeerDisconnected(peer_id))
445-
.await
446-
.ok();
447-
}
448-
.instrument(error_span!("gossip_conn", peer = %peer_id.fmt_short())),
449-
);
450-
451-
// Forward queued pending sends
452-
if let Some(send_queue) = self.pending_sends.remove(&peer_id) {
453-
for msg in send_queue {
454-
send_tx.send(msg).await?;
455-
}
433+
ToActor::ConnIncoming(conn) => {
434+
if let Err(err) = self.conn_manager.accept(conn) {
435+
warn!(?err, "failed to accept connection");
456436
}
457437
}
458438
ToActor::Join(topic_id, peers, reply) => {
@@ -502,9 +482,6 @@ impl Actor {
502482
} else {
503483
debug!("handle in_event {event:?}");
504484
};
505-
if let InEvent::PeerDisconnected(peer) = &event {
506-
self.conn_send_tx.remove(peer);
507-
}
508485
let out = self.state.handle(event, now);
509486
for event in out {
510487
if matches!(event, OutEvent::ScheduleTimer(_, _)) {
@@ -518,10 +495,13 @@ impl Actor {
518495
if let Err(_err) = send.send(message).await {
519496
warn!("conn receiver for {peer_id:?} dropped");
520497
self.conn_send_tx.remove(&peer_id);
498+
self.conn_manager.remove(&peer_id);
521499
}
522500
} else {
523-
debug!(peer = ?peer_id, "dial");
524-
self.dialer.queue_dial(peer_id, GOSSIP_ALPN);
501+
if !self.conn_manager.is_pending(&peer_id) {
502+
debug!(peer = ?peer_id, "dial");
503+
self.conn_manager.dial(peer_id);
504+
}
525505
// TODO: Enforce max length
526506
self.pending_sends.entry(peer_id).or_default().push(message);
527507
}
@@ -544,12 +524,11 @@ impl Actor {
544524
self.timers.insert(now + delay, timer);
545525
}
546526
OutEvent::DisconnectPeer(peer) => {
547-
if let Some(conn) = self.conns.remove(&peer) {
548-
conn.close(0u8.into(), b"close from disconnect");
549-
}
550527
self.conn_send_tx.remove(&peer);
551528
self.pending_sends.remove(&peer);
552-
self.dialer.abort_dial(&peer);
529+
if let Some(conn) = self.conn_manager.remove(&peer) {
530+
conn.close(0u8.into(), b"close from disconnect");
531+
}
553532
}
554533
OutEvent::PeerData(node_id, data) => match decode_peer_data(&data) {
555534
Err(err) => warn!("Failed to decode {data:?} from {node_id}: {err}"),
@@ -566,6 +545,33 @@ impl Actor {
566545
Ok(())
567546
}
568547

548+
async fn handle_new_connection(&mut self, new_conn: ConnInfo) {
549+
let ConnInfo {
550+
conn,
551+
node_id,
552+
direction,
553+
} = new_conn;
554+
let (send_tx, send_rx) = mpsc::channel(SEND_QUEUE_CAP);
555+
self.conn_send_tx.insert(node_id, send_tx.clone());
556+
557+
// Spawn a task for this connection
558+
let pending_sends = self.pending_sends.remove(&node_id);
559+
let in_event_tx = self.in_event_tx.clone();
560+
debug!(peer=%node_id.fmt_short(), ?direction, "connection established");
561+
self.conn_tasks.spawn(
562+
connection_loop(
563+
node_id,
564+
conn,
565+
direction,
566+
send_rx,
567+
in_event_tx,
568+
pending_sends,
569+
)
570+
.map(move |r| (node_id, r))
571+
.instrument(error_span!("gossip_conn", peer = %node_id.fmt_short())),
572+
);
573+
}
574+
569575
fn subscribe_all(&mut self) -> broadcast::Receiver<(TopicId, Event)> {
570576
if let Some(tx) = self.subscribers_all.as_mut() {
571577
tx.subscribe()
@@ -602,16 +608,26 @@ async fn wait_for_neighbor_up(mut sub: broadcast::Receiver<Event>) -> anyhow::Re
602608
async fn connection_loop(
603609
from: PublicKey,
604610
conn: Connection,
605-
origin: ConnOrigin,
611+
direction: ConnDirection,
606612
mut send_rx: mpsc::Receiver<ProtoMessage>,
607-
in_event_tx: &mpsc::Sender<InEvent>,
613+
in_event_tx: mpsc::Sender<InEvent>,
614+
mut pending_sends: Option<Vec<ProtoMessage>>,
608615
) -> anyhow::Result<()> {
609-
let (mut send, mut recv) = match origin {
610-
ConnOrigin::Accept => conn.accept_bi().await?,
611-
ConnOrigin::Dial => conn.open_bi().await?,
616+
let (mut send, mut recv) = match direction {
617+
ConnDirection::Accept => conn.accept_bi().await?,
618+
ConnDirection::Dial => conn.open_bi().await?,
612619
};
613620
let mut send_buf = BytesMut::new();
614621
let mut recv_buf = BytesMut::new();
622+
623+
// Forward queued pending sends
624+
if let Some(mut send_queue) = pending_sends.take() {
625+
for msg in send_queue.drain(..) {
626+
write_message(&mut send, &mut send_buf, &msg).await?;
627+
}
628+
}
629+
630+
// loop over sending and receiving messages
615631
loop {
616632
tokio::select! {
617633
biased;

iroh-net/Cargo.toml

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,10 @@ quinn = { package = "iroh-quinn", version = "0.10.4" }
5353
quinn-proto = { package = "iroh-quinn-proto", version = "0.10.7" }
5454
quinn-udp = { package = "iroh-quinn-udp", version = "0.4" }
5555
rand = "0.8"
56+
rand_chacha = { version = "0.3.1", optional = true }
5657
rand_core = "0.6.4"
5758
rcgen = "0.11"
58-
reqwest = { version = "0.12.4", default-features = false, features = ["rustls-tls"] }
59+
reqwest = { version = "0.11.19", default-features = false, features = ["rustls-tls"] }
5960
ring = "0.17"
6061
rustls = { version = "0.21.11", default-features = false, features = ["dangerous_configuration"] }
6162
serde = { version = "1", features = ["derive", "rc"] }
@@ -125,7 +126,7 @@ duct = "0.13.6"
125126
default = ["metrics"]
126127
iroh-relay = ["clap", "toml", "rustls-pemfile", "regex", "serde_with", "tracing-subscriber"]
127128
metrics = ["iroh-metrics/metrics"]
128-
test-utils = ["axum"]
129+
test-utils = ["axum", "rand_chacha"]
129130

130131
[[bin]]
131132
name = "iroh-relay"

0 commit comments

Comments
 (0)