From 243c88f3f0216c8659e619d386189988bbbf3ca1 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 14 May 2025 10:37:16 +0200 Subject: [PATCH 01/10] refactor: use a separate queue for inbound disco packets from relays --- iroh/src/disco.rs | 14 ++++++ iroh/src/magicsock/relay_actor.rs | 77 +++++++++++++++++++++++++++---- 2 files changed, 83 insertions(+), 8 deletions(-) diff --git a/iroh/src/disco.rs b/iroh/src/disco.rs index f1beaedf778..b5c510be7a6 100644 --- a/iroh/src/disco.rs +++ b/iroh/src/disco.rs @@ -24,6 +24,7 @@ use std::{ }; use anyhow::{anyhow, bail, ensure, Context, Result}; +use bytes::Bytes; use data_encoding::HEXLOWER; use iroh_base::{PublicKey, RelayUrl}; use serde::{Deserialize, Serialize}; @@ -102,6 +103,19 @@ pub fn source_and_box(p: &[u8]) -> Option<(PublicKey, &[u8])> { Some((sender, sealed_box)) } +/// If `p` looks like a disco message it returns the slice of `p` that represents the disco public key source, +/// and the part that is the box. +pub fn source_and_box_bytes(p: &Bytes) -> Option<(PublicKey, Bytes)> { + if !looks_like_disco_wrapper(&p) { + return None; + } + + let source = &p[MAGIC_LEN..MAGIC_LEN + KEY_LEN]; + let sender = PublicKey::try_from(source).ok()?; + let sealed_box = p.slice(MAGIC_LEN + KEY_LEN..); + Some((sender, sealed_box)) +} + /// A discovery message. #[derive(Debug, Clone, PartialEq, Eq)] pub enum Message { diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index d603724fe19..94e47a7bacd 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -60,7 +60,10 @@ use super::RelayDatagramSendChannelReceiver; #[cfg(not(wasm_browser))] use crate::dns::DnsResolver; use crate::{ - magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayDatagramRecvQueue}, + magicsock::{ + DiscoMessageSource, MagicSock, Metrics as MagicsockMetrics, RelayContents, + RelayDatagramRecvQueue, + }, util::MaybeFuture, }; @@ -135,6 +138,8 @@ struct ActiveRelayActor { inbox: mpsc::Receiver, /// Queue for received relay datagrams. relay_datagrams_recv: Arc, + /// Queue for received relay disco packets. + relay_disco_recv: mpsc::Sender, /// Channel on which we queue packets to send to the relay. relay_datagrams_send: mpsc::Receiver, @@ -159,6 +164,14 @@ struct ActiveRelayActor { metrics: Arc, } +#[derive(Debug)] +struct RelayDiscoMessage { + source: PublicKey, + sealed_box: Bytes, + relay_url: RelayUrl, + relay_remote_node_id: PublicKey, +} + #[derive(Debug)] enum ActiveRelayMessage { /// Triggers a connection check to the relay server. @@ -197,6 +210,7 @@ struct ActiveRelayActorOptions { inbox: mpsc::Receiver, relay_datagrams_send: mpsc::Receiver, relay_datagrams_recv: Arc, + relay_disco_recv: mpsc::Sender, connection_opts: RelayConnectionOptions, stop_token: CancellationToken, metrics: Arc, @@ -234,6 +248,7 @@ impl ActiveRelayActor { inbox, relay_datagrams_send, relay_datagrams_recv, + relay_disco_recv, connection_opts, stop_token, metrics, @@ -244,6 +259,7 @@ impl ActiveRelayActor { inbox, relay_datagrams_recv, relay_datagrams_send, + relay_disco_recv, url, relay_client_builder, is_home_relay: false, @@ -646,13 +662,33 @@ impl ActiveRelayActor { state.last_packet_src = Some(remote_node_id); state.nodes_present.insert(remote_node_id); } - for datagram in PacketSplitIter::new(self.url.clone(), remote_node_id, data) { - let Ok(datagram) = datagram else { - warn!("Invalid packet split"); - break; - }; - if let Err(err) = self.relay_datagrams_recv.try_send(datagram) { - warn!("Dropping received relay packet: {err:#}"); + match crate::disco::source_and_box_bytes(&data) { + Some((source, sealed_box)) => { + if remote_node_id != source { + // TODO: return here? + warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); + } + let message = RelayDiscoMessage { + source, + sealed_box, + relay_url: self.url.clone(), + relay_remote_node_id: remote_node_id, + }; + if let Err(err) = self.relay_disco_recv.try_send(message) { + warn!("Dropping received relay disco packet: {err:#}"); + } + } + None => { + for datagram in PacketSplitIter::new(self.url.clone(), remote_node_id, data) + { + let Ok(datagram) = datagram else { + warn!("Invalid packet split"); + break; + }; + if let Err(err) = self.relay_datagrams_recv.try_send(datagram) { + warn!("Dropping received relay data packet: {err:#}"); + } + } } } } @@ -814,6 +850,8 @@ pub(super) struct RelayActor { /// /// [`AsyncUdpSocket::poll_recv`]: quinn::AsyncUdpSocket::poll_recv relay_datagram_recv_queue: Arc, + relay_disco_recv_tx: mpsc::Sender, + relay_disco_recv_rx: mpsc::Receiver, /// The actors managing each currently used relay server. /// /// These actors will exit when they have any inactivity. Otherwise they will keep @@ -832,9 +870,12 @@ impl RelayActor { protocol: iroh_relay::http::Protocol, ) -> Self { let cancel_token = CancellationToken::new(); + let (relay_disco_recv_tx, relay_disco_recv_rx) = mpsc::channel(1024); Self { msock, relay_datagram_recv_queue, + relay_disco_recv_tx, + relay_disco_recv_rx, active_relays: Default::default(), active_relay_tasks: JoinSet::new(), cancel_token, @@ -862,6 +903,17 @@ impl RelayActor { debug!("shutting down"); break; } + Some(message) = self.relay_disco_recv_rx.recv() => { + self.msock. + handle_disco_message( + message.source, + &message.sealed_box, + DiscoMessageSource::Relay { + url: message.relay_url, + key: message.relay_remote_node_id + }, + ); + } Some(res) = self.active_relay_tasks.join_next() => { match res { Ok(()) => (), @@ -1056,6 +1108,7 @@ impl RelayActor { inbox: inbox_rx, relay_datagrams_send: send_datagram_rx, relay_datagrams_recv: self.relay_datagram_recv_queue.clone(), + relay_disco_recv: self.relay_disco_recv_tx.clone(), connection_opts, stop_token: self.cancel_token.child_token(), metrics: self.msock.metrics.magicsock.clone(), @@ -1331,6 +1384,7 @@ mod tests { inbox_rx: mpsc::Receiver, relay_datagrams_send: mpsc::Receiver, relay_datagrams_recv: Arc, + relay_disco_recv: mpsc::Sender, span: tracing::Span, ) -> AbortOnDropHandle> { let opts = ActiveRelayActorOptions { @@ -1339,6 +1393,7 @@ mod tests { inbox: inbox_rx, relay_datagrams_send, relay_datagrams_recv, + relay_disco_recv, connection_opts: RelayConnectionOptions { secret_key, dns_resolver: DnsResolver::new(), @@ -1363,6 +1418,7 @@ mod tests { let secret_key = SecretKey::from_bytes(&[8u8; 32]); let recv_datagram_queue = Arc::new(RelayDatagramRecvQueue::new()); let (send_datagram_tx, send_datagram_rx) = mpsc::channel(16); + let (relay_disco_recv_tx, _relay_disco_recv_rx) = mpsc::channel(16); let (prio_inbox_tx, prio_inbox_rx) = mpsc::channel(8); let (inbox_tx, inbox_rx) = mpsc::channel(16); let cancel_token = CancellationToken::new(); @@ -1374,6 +1430,7 @@ mod tests { inbox_rx, send_datagram_rx, recv_datagram_queue.clone(), + relay_disco_recv_tx, info_span!("echo-node"), ); let echo_task = tokio::spawn({ @@ -1455,6 +1512,7 @@ mod tests { let secret_key = SecretKey::from_bytes(&[1u8; 32]); let datagram_recv_queue = Arc::new(RelayDatagramRecvQueue::new()); + let (relay_disco_recv_tx, _relay_disco_recv_rx) = mpsc::channel(16); let (send_datagram_tx, send_datagram_rx) = mpsc::channel(16); let (_prio_inbox_tx, prio_inbox_rx) = mpsc::channel(8); let (inbox_tx, inbox_rx) = mpsc::channel(16); @@ -1467,6 +1525,7 @@ mod tests { inbox_rx, send_datagram_rx, datagram_recv_queue.clone(), + relay_disco_recv_tx, info_span!("actor-under-test"), ); @@ -1541,6 +1600,7 @@ mod tests { let secret_key = SecretKey::from_bytes(&[1u8; 32]); let datagram_recv_queue = Arc::new(RelayDatagramRecvQueue::new()); + let (relay_disco_recv_tx, _relay_disco_recv_rx) = mpsc::channel(16); let (_send_datagram_tx, send_datagram_rx) = mpsc::channel(16); let (_prio_inbox_tx, prio_inbox_rx) = mpsc::channel(8); let (inbox_tx, inbox_rx) = mpsc::channel(16); @@ -1553,6 +1613,7 @@ mod tests { inbox_rx, send_datagram_rx, datagram_recv_queue.clone(), + relay_disco_recv_tx, info_span!("actor-under-test"), ); From d6769b832491055d099aa0465d55f55afdbab411 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 14 May 2025 10:39:27 +0200 Subject: [PATCH 02/10] fix: do not double-check if relay packets are disco packets --- iroh/src/magicsock.rs | 31 ------------------------------- 1 file changed, 31 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index 9cda164c82d..fe2c75ebce3 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -1094,11 +1094,6 @@ impl MagicSock { return None; } - if self.handle_relay_disco_message(&dm.buf, &dm.url, dm.src) { - // DISCO messages are handled internally in the MagicSock, do not pass to Quinn. - return None; - } - let quic_mapped_addr = self.node_map.receive_relay(&dm.url, dm.src); // Normalize local_ip @@ -1119,32 +1114,6 @@ impl MagicSock { Some((dm.src, meta, dm.buf)) } - fn handle_relay_disco_message( - &self, - msg: &[u8], - url: &RelayUrl, - relay_node_src: PublicKey, - ) -> bool { - match disco::source_and_box(msg) { - Some((source, sealed_box)) => { - if relay_node_src != source { - // TODO: return here? - warn!("Received relay disco message from connection for {}, but with message from {}", relay_node_src.fmt_short(), source.fmt_short()); - } - self.handle_disco_message( - source, - sealed_box, - DiscoMessageSource::Relay { - url: url.clone(), - key: relay_node_src, - }, - ); - true - } - None => false, - } - } - /// Handles a discovery message. #[instrument("disco_in", skip_all, fields(node = %sender.fmt_short(), %src))] fn handle_disco_message(&self, sender: PublicKey, sealed_box: &[u8], src: DiscoMessageSource) { From e36539252166eac1afd6ef8cca17b48cc687dd13 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 14 May 2025 10:46:35 +0200 Subject: [PATCH 03/10] fix: first split then test for disco --- iroh/src/magicsock/relay_actor.rs | 43 +++++++++++++++---------------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 94e47a7bacd..3d27a49f3e8 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -662,29 +662,28 @@ impl ActiveRelayActor { state.last_packet_src = Some(remote_node_id); state.nodes_present.insert(remote_node_id); } - match crate::disco::source_and_box_bytes(&data) { - Some((source, sealed_box)) => { - if remote_node_id != source { - // TODO: return here? - warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); - } - let message = RelayDiscoMessage { - source, - sealed_box, - relay_url: self.url.clone(), - relay_remote_node_id: remote_node_id, - }; - if let Err(err) = self.relay_disco_recv.try_send(message) { - warn!("Dropping received relay disco packet: {err:#}"); - } - } - None => { - for datagram in PacketSplitIter::new(self.url.clone(), remote_node_id, data) - { - let Ok(datagram) = datagram else { - warn!("Invalid packet split"); - break; + for datagram in PacketSplitIter::new(self.url.clone(), remote_node_id, data) { + let Ok(datagram) = datagram else { + warn!("Invalid packet split"); + break; + }; + match crate::disco::source_and_box_bytes(&datagram.buf) { + Some((source, sealed_box)) => { + if remote_node_id != source { + // TODO: return here? + warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); + } + let message = RelayDiscoMessage { + source, + sealed_box, + relay_url: datagram.url.clone(), + relay_remote_node_id: datagram.src, }; + if let Err(err) = self.relay_disco_recv.try_send(message) { + warn!("Dropping received relay disco packet: {err:#}"); + } + } + None => { if let Err(err) = self.relay_datagrams_recv.try_send(datagram) { warn!("Dropping received relay data packet: {err:#}"); } From e5920696957891ed3753e01c67539c3616a36317 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 14 May 2025 10:56:20 +0200 Subject: [PATCH 04/10] refactor: use a separate task for the relay disco recv queue --- iroh/src/magicsock.rs | 24 +++++++++++++++++++++++- iroh/src/magicsock/relay_actor.rs | 30 +++++++----------------------- 2 files changed, 30 insertions(+), 24 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index fe2c75ebce3..ffd8e7e5e5b 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -1796,7 +1796,13 @@ impl Handle { let mut actor_tasks = JoinSet::default(); - let relay_actor = RelayActor::new(msock.clone(), relay_datagram_recv_queue, relay_protocol); + let (relay_disco_recv_tx, mut relay_disco_recv_rx) = tokio::sync::mpsc::channel(1024); + let relay_actor = RelayActor::new( + msock.clone(), + relay_datagram_recv_queue, + relay_disco_recv_tx, + relay_protocol, + ); let relay_actor_cancel_token = relay_actor.cancel_token(); actor_tasks.spawn( async move { @@ -1806,6 +1812,22 @@ impl Handle { } .instrument(info_span!("relay-actor")), ); + actor_tasks.spawn({ + let msock = msock.clone(); + async move { + while let Some(message) = relay_disco_recv_rx.recv().await { + msock.handle_disco_message( + message.source, + &message.sealed_box, + DiscoMessageSource::Relay { + url: message.relay_url, + key: message.relay_remote_node_id, + }, + ); + } + } + .instrument(info_span!("relay-disco-recv")) + }); #[cfg(not(wasm_browser))] let _ = actor_tasks.spawn({ diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 3d27a49f3e8..46b6e4d4c8f 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -60,10 +60,7 @@ use super::RelayDatagramSendChannelReceiver; #[cfg(not(wasm_browser))] use crate::dns::DnsResolver; use crate::{ - magicsock::{ - DiscoMessageSource, MagicSock, Metrics as MagicsockMetrics, RelayContents, - RelayDatagramRecvQueue, - }, + magicsock::{MagicSock, Metrics as MagicsockMetrics, RelayContents, RelayDatagramRecvQueue}, util::MaybeFuture, }; @@ -165,11 +162,11 @@ struct ActiveRelayActor { } #[derive(Debug)] -struct RelayDiscoMessage { - source: PublicKey, - sealed_box: Bytes, - relay_url: RelayUrl, - relay_remote_node_id: PublicKey, +pub(super) struct RelayDiscoMessage { + pub(super) source: PublicKey, + pub(super) sealed_box: Bytes, + pub(super) relay_url: RelayUrl, + pub(super) relay_remote_node_id: PublicKey, } #[derive(Debug)] @@ -850,7 +847,6 @@ pub(super) struct RelayActor { /// [`AsyncUdpSocket::poll_recv`]: quinn::AsyncUdpSocket::poll_recv relay_datagram_recv_queue: Arc, relay_disco_recv_tx: mpsc::Sender, - relay_disco_recv_rx: mpsc::Receiver, /// The actors managing each currently used relay server. /// /// These actors will exit when they have any inactivity. Otherwise they will keep @@ -866,15 +862,14 @@ impl RelayActor { pub(super) fn new( msock: Arc, relay_datagram_recv_queue: Arc, + relay_disco_recv_tx: mpsc::Sender, protocol: iroh_relay::http::Protocol, ) -> Self { let cancel_token = CancellationToken::new(); - let (relay_disco_recv_tx, relay_disco_recv_rx) = mpsc::channel(1024); Self { msock, relay_datagram_recv_queue, relay_disco_recv_tx, - relay_disco_recv_rx, active_relays: Default::default(), active_relay_tasks: JoinSet::new(), cancel_token, @@ -902,17 +897,6 @@ impl RelayActor { debug!("shutting down"); break; } - Some(message) = self.relay_disco_recv_rx.recv() => { - self.msock. - handle_disco_message( - message.source, - &message.sealed_box, - DiscoMessageSource::Relay { - url: message.relay_url, - key: message.relay_remote_node_id - }, - ); - } Some(res) = self.active_relay_tasks.join_next() => { match res { Ok(()) => (), From f4034ef9f238ae5982df72ca17e2dac9be756528 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 14 May 2025 11:00:03 +0200 Subject: [PATCH 05/10] chore: clippy --- iroh/src/disco.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iroh/src/disco.rs b/iroh/src/disco.rs index b5c510be7a6..eb6087b18c4 100644 --- a/iroh/src/disco.rs +++ b/iroh/src/disco.rs @@ -106,7 +106,7 @@ pub fn source_and_box(p: &[u8]) -> Option<(PublicKey, &[u8])> { /// If `p` looks like a disco message it returns the slice of `p` that represents the disco public key source, /// and the part that is the box. pub fn source_and_box_bytes(p: &Bytes) -> Option<(PublicKey, Bytes)> { - if !looks_like_disco_wrapper(&p) { + if !looks_like_disco_wrapper(p) { return None; } From c5ebe47208d5fddddd7fafb7d909ff3e6b4c95b4 Mon Sep 17 00:00:00 2001 From: Frando Date: Wed, 14 May 2025 11:21:46 +0200 Subject: [PATCH 06/10] debug log actor exit --- iroh/src/magicsock.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index ffd8e7e5e5b..ea89e238f16 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -1825,6 +1825,7 @@ impl Handle { }, ); } + debug!("relay-disco-recv actor closed"); } .instrument(info_span!("relay-disco-recv")) }); From 68b164cf734d3d7bbb765a31a4418e95b0507926 Mon Sep 17 00:00:00 2001 From: Frando Date: Thu, 15 May 2025 14:29:51 +0200 Subject: [PATCH 07/10] refactor: apply backpressure without blocking the actor loop --- iroh/src/magicsock.rs | 37 ++++-- iroh/src/magicsock/relay_actor.rs | 179 ++++++++++++++++++++++++------ 2 files changed, 177 insertions(+), 39 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index ea89e238f16..a4e0edf31b5 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -2115,7 +2115,8 @@ impl RelayDatagramSendChannelReceiver { #[derive(Debug)] struct RelayDatagramRecvQueue { queue: ConcurrentQueue, - waker: AtomicWaker, + recv_waker: AtomicWaker, + send_waker: AtomicWaker, } impl RelayDatagramRecvQueue { @@ -2123,7 +2124,8 @@ impl RelayDatagramRecvQueue { fn new() -> Self { Self { queue: ConcurrentQueue::bounded(512), - waker: AtomicWaker::new(), + recv_waker: AtomicWaker::new(), + send_waker: AtomicWaker::new(), } } @@ -2136,10 +2138,21 @@ impl RelayDatagramRecvQueue { item: RelayRecvDatagram, ) -> Result<(), concurrent_queue::PushError> { self.queue.push(item).inspect(|_| { - self.waker.wake(); + self.recv_waker.wake(); }) } + fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { + if self.queue.is_closed() { + Poll::Ready(Err(anyhow!("Queue closed"))) + } else if !self.queue.is_full() { + Poll::Ready(Ok(())) + } else { + self.send_waker.register(cx.waker()); + Poll::Pending + } + } + /// Polls for new items in the queue. /// /// Although this method is available from `&self`, it must not be @@ -2154,23 +2167,31 @@ impl RelayDatagramRecvQueue { /// to be able to poll from `&self`. fn poll_recv(&self, cx: &mut Context) -> Poll> { match self.queue.pop() { - Ok(value) => Poll::Ready(Ok(value)), + Ok(value) => { + self.send_waker.wake(); + Poll::Ready(Ok(value)) + } Err(concurrent_queue::PopError::Empty) => { - self.waker.register(cx.waker()); + self.recv_waker.register(cx.waker()); match self.queue.pop() { Ok(value) => { - self.waker.take(); + self.send_waker.wake(); + self.recv_waker.take(); Poll::Ready(Ok(value)) } Err(concurrent_queue::PopError::Empty) => Poll::Pending, Err(concurrent_queue::PopError::Closed) => { - self.waker.take(); + self.recv_waker.take(); + self.send_waker.wake(); Poll::Ready(Err(anyhow!("Queue closed"))) } } } - Err(concurrent_queue::PopError::Closed) => Poll::Ready(Err(anyhow!("Queue closed"))), + Err(concurrent_queue::PopError::Closed) => { + self.send_waker.wake(); + Poll::Ready(Err(anyhow!("Queue closed"))) + } } } } diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 46b6e4d4c8f..20df6ca2fd8 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -51,7 +51,10 @@ use n0_future::{ time::{self, Duration, Instant, MissedTickBehavior}, FuturesUnorderedBounded, SinkExt, StreamExt, }; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::{ + mpsc::{self, OwnedPermit}, + oneshot, +}; use tokio_util::sync::CancellationToken; use tracing::{debug, error, event, info_span, instrument, trace, warn, Instrument, Level}; use url::Url; @@ -159,6 +162,20 @@ struct ActiveRelayActor { /// Token indicating the [`ActiveRelayActor`] should stop. stop_token: CancellationToken, metrics: Arc, + /// Received relay packets that could not yet be forwarded to the magicsocket. + pending_received: Option, +} + +#[derive(Debug)] +struct PendingRecv { + packet_iter: PacketSplitIter, + blocked_on: RecvPath, +} + +#[derive(Debug)] +enum RecvPath { + Data, + Disco, } #[derive(Debug)] @@ -263,6 +280,7 @@ impl ActiveRelayActor { inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)), stop_token, metrics, + pending_received: None, } } @@ -612,7 +630,8 @@ impl ActiveRelayActor { let fut = client_sink.send_all(&mut packet_stream); self.run_sending(fut, &mut state, &mut client_stream).await?; } - msg = client_stream.next() => { + _ = forward_pending(&mut self.pending_received, &self.relay_datagrams_recv, &mut self.relay_disco_recv), if self.pending_received.is_some() => {} + msg = client_stream.next(), if self.pending_received.is_none() => { let Some(msg) = msg else { break Err(anyhow!("Stream closed by server.")); }; @@ -659,33 +678,14 @@ impl ActiveRelayActor { state.last_packet_src = Some(remote_node_id); state.nodes_present.insert(remote_node_id); } - for datagram in PacketSplitIter::new(self.url.clone(), remote_node_id, data) { - let Ok(datagram) = datagram else { - warn!("Invalid packet split"); - break; - }; - match crate::disco::source_and_box_bytes(&datagram.buf) { - Some((source, sealed_box)) => { - if remote_node_id != source { - // TODO: return here? - warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); - } - let message = RelayDiscoMessage { - source, - sealed_box, - relay_url: datagram.url.clone(), - relay_remote_node_id: datagram.src, - }; - if let Err(err) = self.relay_disco_recv.try_send(message) { - warn!("Dropping received relay disco packet: {err:#}"); - } - } - None => { - if let Err(err) = self.relay_datagrams_recv.try_send(datagram) { - warn!("Dropping received relay data packet: {err:#}"); - } - } - } + let packet_iter = PacketSplitIter::new(self.url.clone(), remote_node_id, data); + if let Some(pending) = handle_received_packet_iter( + packet_iter, + None, + &self.relay_datagrams_recv, + &mut self.relay_disco_recv, + ) { + self.pending_received = Some(pending); } } ReceivedMessage::NodeGone(node_id) => { @@ -769,7 +769,8 @@ impl ActiveRelayActor { break Err(anyhow!("Ping timeout")); } // No need to read the inbox or datagrams to send. - msg = client_stream.next() => { + _ = forward_pending(&mut self.pending_received, &self.relay_datagrams_recv, &mut self.relay_disco_recv), if self.pending_received.is_some() => {} + msg = client_stream.next(), if self.pending_received.is_none() => { let Some(msg) = msg else { break Err(anyhow!("Stream closed by server.")); }; @@ -788,6 +789,105 @@ impl ActiveRelayActor { } } +/// Forward pending received packets to their queues. +/// +/// If `maybe_pending` is not empty, this will wait for the path the last received item +/// is blocked on (via [`PendingRecv::blocked_on`]) to become unblocked. It will then forward +/// the pending items, until a queue is blocked again. In that case, the remaining items will +/// be put back into `maybe_pending`. If all items could be sent, `maybe_pending` will be set +/// to `None`. +/// +/// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed +/// to either be sent into their respective queues, or are still in `maybe_pending`. +async fn forward_pending( + maybe_pending: &mut Option, + relay_datagrams_recv: &RelayDatagramRecvQueue, + relay_disco_recv: &mut mpsc::Sender, +) { + // We take a mutable reference onto the inner value. + // we're not `take`ing it here, because this would make the function not cancellation safe. + let Some(ref mut pending) = maybe_pending else { + return; + }; + let disco_permit = match pending.blocked_on { + RecvPath::Data => { + std::future::poll_fn(|cx| relay_datagrams_recv.poll_send_ready(cx)) + .await + .ok(); + None + } + RecvPath::Disco => { + let Ok(permit) = relay_disco_recv.clone().reserve_owned().await else { + return; + }; + Some(permit) + } + }; + // We now take the inner value by value. it is cancellation safe here because + // no further `await`s occur after here. + // The unwrap is guaranteed to be safe because we checked above that it is not none. + #[allow(clippy::unwrap_used, reason = "checked above")] + let pending = maybe_pending.take().unwrap(); + if let Some(pending) = handle_received_packet_iter( + pending.packet_iter, + disco_permit, + relay_datagrams_recv, + relay_disco_recv, + ) { + *maybe_pending = Some(pending); + } +} + +fn handle_received_packet_iter( + mut packet_iter: PacketSplitIter, + mut disco_permit: Option>, + relay_datagrams_recv: &RelayDatagramRecvQueue, + relay_disco_recv: &mut mpsc::Sender, +) -> Option { + let remote_node_id = packet_iter.remote_node_id(); + for datagram in &mut packet_iter { + let Ok(datagram) = datagram else { + warn!("Invalid packet split"); + return None; + }; + match crate::disco::source_and_box_bytes(&datagram.buf) { + Some((source, sealed_box)) => { + if remote_node_id != source { + // TODO: return here? + warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); + } + let message = RelayDiscoMessage { + source, + sealed_box, + relay_url: datagram.url.clone(), + relay_remote_node_id: datagram.src, + }; + if let Some(permit) = disco_permit.take() { + permit.send(message); + } else if let Err(err) = relay_disco_recv.try_send(message) { + warn!("Dropping received relay disco packet: {err:#}"); + packet_iter.push_front(datagram); + return Some(PendingRecv { + packet_iter, + blocked_on: RecvPath::Disco, + }); + } + } + None => { + if let Err(err) = relay_datagrams_recv.try_send(datagram) { + warn!("Dropping received relay data packet: {err:#}"); + packet_iter.push_front(err.into_inner()); + return Some(PendingRecv { + packet_iter, + blocked_on: RecvPath::Data, + }); + } + } + } + } + None +} + /// Shared state when the [`ActiveRelayActor`] is connected to a relay server. /// /// Common state between [`ActiveRelayActor::run_connected`] and @@ -1270,12 +1370,22 @@ struct PacketSplitIter { url: RelayUrl, src: NodeId, bytes: Bytes, + next: Option, } impl PacketSplitIter { /// Create a new PacketSplitIter from a packet. fn new(url: RelayUrl, src: NodeId, bytes: Bytes) -> Self { - Self { url, src, bytes } + Self { + url, + src, + bytes, + next: None, + } + } + + fn remote_node_id(&self) -> NodeId { + self.src } fn fail(&mut self) -> Option> { @@ -1285,6 +1395,10 @@ impl PacketSplitIter { "", ))) } + + fn push_front(&mut self, item: RelayRecvDatagram) { + self.next = Some(item); + } } impl Iterator for PacketSplitIter { @@ -1292,6 +1406,9 @@ impl Iterator for PacketSplitIter { fn next(&mut self) -> Option { use bytes::Buf; + if let Some(item) = self.next.take() { + return Some(Ok(item)); + } if self.bytes.has_remaining() { if self.bytes.remaining() < 2 { return self.fail(); From ceaca9fbd0554a90796314da5b9c373027a85c7e Mon Sep 17 00:00:00 2001 From: Frando Date: Thu, 15 May 2025 21:40:16 +0200 Subject: [PATCH 08/10] refactor: make the new code paths cleaner --- iroh/src/magicsock/relay_actor.rs | 247 +++++++++++++++--------------- 1 file changed, 125 insertions(+), 122 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 20df6ca2fd8..6ae2f0c170b 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -136,10 +136,6 @@ struct ActiveRelayActor { prio_inbox: mpsc::Receiver, /// Inbox for messages which involve sending to the relay server. inbox: mpsc::Receiver, - /// Queue for received relay datagrams. - relay_datagrams_recv: Arc, - /// Queue for received relay disco packets. - relay_disco_recv: mpsc::Sender, /// Channel on which we queue packets to send to the relay. relay_datagrams_send: mpsc::Receiver, @@ -162,20 +158,7 @@ struct ActiveRelayActor { /// Token indicating the [`ActiveRelayActor`] should stop. stop_token: CancellationToken, metrics: Arc, - /// Received relay packets that could not yet be forwarded to the magicsocket. - pending_received: Option, -} - -#[derive(Debug)] -struct PendingRecv { - packet_iter: PacketSplitIter, - blocked_on: RecvPath, -} - -#[derive(Debug)] -enum RecvPath { - Data, - Disco, + receive_queue: ReceiveQueue, } #[derive(Debug)] @@ -271,16 +254,18 @@ impl ActiveRelayActor { ActiveRelayActor { prio_inbox, inbox, - relay_datagrams_recv, relay_datagrams_send, - relay_disco_recv, url, relay_client_builder, is_home_relay: false, inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)), stop_token, metrics, - pending_received: None, + receive_queue: ReceiveQueue { + relay_datagrams_recv, + relay_disco_recv, + pending: None, + }, } } @@ -630,8 +615,8 @@ impl ActiveRelayActor { let fut = client_sink.send_all(&mut packet_stream); self.run_sending(fut, &mut state, &mut client_stream).await?; } - _ = forward_pending(&mut self.pending_received, &self.relay_datagrams_recv, &mut self.relay_disco_recv), if self.pending_received.is_some() => {} - msg = client_stream.next(), if self.pending_received.is_none() => { + () = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {} + msg = client_stream.next(), if !self.receive_queue.is_pending() => { let Some(msg) = msg else { break Err(anyhow!("Stream closed by server.")); }; @@ -678,15 +663,8 @@ impl ActiveRelayActor { state.last_packet_src = Some(remote_node_id); state.nodes_present.insert(remote_node_id); } - let packet_iter = PacketSplitIter::new(self.url.clone(), remote_node_id, data); - if let Some(pending) = handle_received_packet_iter( - packet_iter, - None, - &self.relay_datagrams_recv, - &mut self.relay_disco_recv, - ) { - self.pending_received = Some(pending); - } + let packets = PacketSplitIter::new(self.url.clone(), remote_node_id, data); + self.receive_queue.queue_packets(packets); } ReceivedMessage::NodeGone(node_id) => { state.nodes_present.remove(&node_id); @@ -769,8 +747,8 @@ impl ActiveRelayActor { break Err(anyhow!("Ping timeout")); } // No need to read the inbox or datagrams to send. - _ = forward_pending(&mut self.pending_received, &self.relay_datagrams_recv, &mut self.relay_disco_recv), if self.pending_received.is_some() => {} - msg = client_stream.next(), if self.pending_received.is_none() => { + () = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {} + msg = client_stream.next(), if !self.receive_queue.is_pending() => { let Some(msg) = msg else { break Err(anyhow!("Stream closed by server.")); }; @@ -789,103 +767,128 @@ impl ActiveRelayActor { } } -/// Forward pending received packets to their queues. -/// -/// If `maybe_pending` is not empty, this will wait for the path the last received item -/// is blocked on (via [`PendingRecv::blocked_on`]) to become unblocked. It will then forward -/// the pending items, until a queue is blocked again. In that case, the remaining items will -/// be put back into `maybe_pending`. If all items could be sent, `maybe_pending` will be set -/// to `None`. -/// -/// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed -/// to either be sent into their respective queues, or are still in `maybe_pending`. -async fn forward_pending( - maybe_pending: &mut Option, - relay_datagrams_recv: &RelayDatagramRecvQueue, - relay_disco_recv: &mut mpsc::Sender, -) { - // We take a mutable reference onto the inner value. - // we're not `take`ing it here, because this would make the function not cancellation safe. - let Some(ref mut pending) = maybe_pending else { - return; - }; - let disco_permit = match pending.blocked_on { - RecvPath::Data => { - std::future::poll_fn(|cx| relay_datagrams_recv.poll_send_ready(cx)) - .await - .ok(); - None - } - RecvPath::Disco => { - let Ok(permit) = relay_disco_recv.clone().reserve_owned().await else { - return; - }; - Some(permit) - } - }; - // We now take the inner value by value. it is cancellation safe here because - // no further `await`s occur after here. - // The unwrap is guaranteed to be safe because we checked above that it is not none. - #[allow(clippy::unwrap_used, reason = "checked above")] - let pending = maybe_pending.take().unwrap(); - if let Some(pending) = handle_received_packet_iter( - pending.packet_iter, - disco_permit, - relay_datagrams_recv, - relay_disco_recv, - ) { - *maybe_pending = Some(pending); - } +#[derive(Debug)] +struct ReceiveQueue { + /// Received relay packets that could not yet be forwarded to the magicsocket. + pending: Option, + /// Queue for received relay datagrams. + relay_datagrams_recv: Arc, + /// Queue for received relay disco packets. + relay_disco_recv: mpsc::Sender, +} + +#[derive(Debug)] +struct PendingRecv { + packets: PacketSplitIter, + blocked_on: RecvPath, +} + +#[derive(Debug)] +enum RecvPath { + Data, + Disco, } -fn handle_received_packet_iter( - mut packet_iter: PacketSplitIter, - mut disco_permit: Option>, - relay_datagrams_recv: &RelayDatagramRecvQueue, - relay_disco_recv: &mut mpsc::Sender, -) -> Option { - let remote_node_id = packet_iter.remote_node_id(); - for datagram in &mut packet_iter { - let Ok(datagram) = datagram else { - warn!("Invalid packet split"); - return None; +impl ReceiveQueue { + fn is_pending(&self) -> bool { + self.pending.is_some() + } + + fn queue_packets(&mut self, packets: PacketSplitIter) { + debug_assert!( + !self.is_pending(), + "ReceiveQueue::queue_packets may not be called if is_pending() returns true" + ); + self.handle_packets(packets, None); + } + + /// Forward pending received packets to their queues. + /// + /// This will wait for the path the last received item is blocked on (via [`PendingRecv::blocked_on`]) + /// to become unblocked. It will then forward the pending items, until a queue is blocked again. + /// In that case, the remaining items will be stored and [`Self::is_pending`] returns true. + /// + /// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed + /// to either be sent into their respective queues or preserved here. + async fn forward_pending(&mut self) { + // We take a reference onto the inner value. + // we're not `take`ing it here, because this would make the function not cancellation safe. + let Some(ref pending) = self.pending else { + return; }; - match crate::disco::source_and_box_bytes(&datagram.buf) { - Some((source, sealed_box)) => { - if remote_node_id != source { - // TODO: return here? - warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); - } - let message = RelayDiscoMessage { - source, - sealed_box, - relay_url: datagram.url.clone(), - relay_remote_node_id: datagram.src, + let disco_permit = match pending.blocked_on { + RecvPath::Data => { + std::future::poll_fn(|cx| self.relay_datagrams_recv.poll_send_ready(cx)) + .await + .ok(); + None + } + RecvPath::Disco => { + let Ok(permit) = self.relay_disco_recv.clone().reserve_owned().await else { + return; }; - if let Some(permit) = disco_permit.take() { - permit.send(message); - } else if let Err(err) = relay_disco_recv.try_send(message) { - warn!("Dropping received relay disco packet: {err:#}"); - packet_iter.push_front(datagram); - return Some(PendingRecv { - packet_iter, - blocked_on: RecvPath::Disco, - }); - } + Some(permit) } - None => { - if let Err(err) = relay_datagrams_recv.try_send(datagram) { - warn!("Dropping received relay data packet: {err:#}"); - packet_iter.push_front(err.into_inner()); - return Some(PendingRecv { - packet_iter, - blocked_on: RecvPath::Data, - }); + }; + let packets = self + .pending + .take() + .expect("checked to be not empty") + .packets; + self.handle_packets(packets, disco_permit); + } + + fn handle_packets( + &mut self, + mut packet_iter: PacketSplitIter, + mut disco_permit: Option>, + ) { + let remote_node_id = packet_iter.remote_node_id(); + for datagram in &mut packet_iter { + let Ok(datagram) = datagram else { + warn!("Invalid packet split"); + self.pending = None; + return; + }; + match crate::disco::source_and_box_bytes(&datagram.buf) { + Some((source, sealed_box)) => { + if remote_node_id != source { + // TODO: return here? + warn!("Received relay disco message from connection for {}, but with message from {}", remote_node_id.fmt_short(), source.fmt_short()); + } + let message = RelayDiscoMessage { + source, + sealed_box, + relay_url: datagram.url.clone(), + relay_remote_node_id: datagram.src, + }; + if let Some(permit) = disco_permit.take() { + permit.send(message); + } else if let Err(err) = self.relay_disco_recv.try_send(message) { + warn!("Relay disco receive queue blocked: {err}"); + packet_iter.push_front(datagram); + self.pending = Some(PendingRecv { + packets: packet_iter, + blocked_on: RecvPath::Disco, + }); + return; + } + } + None => { + if let Err(err) = self.relay_datagrams_recv.try_send(datagram) { + warn!("Relay data receive queue blocked: {err}"); + packet_iter.push_front(err.into_inner()); + self.pending = Some(PendingRecv { + packets: packet_iter, + blocked_on: RecvPath::Data, + }); + return; + } } } } + self.pending = None; } - None } /// Shared state when the [`ActiveRelayActor`] is connected to a relay server. From c52390a58600d4b9a15c5eb577c3dd9274718202 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 16 May 2025 09:52:19 +0200 Subject: [PATCH 09/10] fix: store a list of send wakers on the relay data queue --- iroh/src/magicsock.rs | 44 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 36 insertions(+), 8 deletions(-) diff --git a/iroh/src/magicsock.rs b/iroh/src/magicsock.rs index a4e0edf31b5..880277fe9ba 100644 --- a/iroh/src/magicsock.rs +++ b/iroh/src/magicsock.rs @@ -2116,7 +2116,7 @@ impl RelayDatagramSendChannelReceiver { struct RelayDatagramRecvQueue { queue: ConcurrentQueue, recv_waker: AtomicWaker, - send_waker: AtomicWaker, + send_wakers: ConcurrentQueue, } impl RelayDatagramRecvQueue { @@ -2125,7 +2125,7 @@ impl RelayDatagramRecvQueue { Self { queue: ConcurrentQueue::bounded(512), recv_waker: AtomicWaker::new(), - send_waker: AtomicWaker::new(), + send_wakers: ConcurrentQueue::unbounded(), } } @@ -2142,14 +2142,42 @@ impl RelayDatagramRecvQueue { }) } + /// Polls for whether the queue has free slots for sending items. + /// + /// If the queue has free slots, this returns [`Poll::Ready`]. + /// If the queue is full, [`Poll::Pending`] is returned and the waker + /// is stored and woken once the queue has free slots. + /// + /// This can be called from multiple tasks concurrently. If a slot becomes + /// available, all stored wakers will be woken simultaneously. + /// This also means that even if [`Poll::Ready`] is returned, it is not + /// guaranteed that [`Self::try_send`] will return `Ok` on the next call, + /// because another send task could have used the slot already. fn poll_send_ready(&self, cx: &mut Context<'_>) -> Poll> { if self.queue.is_closed() { Poll::Ready(Err(anyhow!("Queue closed"))) } else if !self.queue.is_full() { Poll::Ready(Ok(())) } else { - self.send_waker.register(cx.waker()); - Poll::Pending + match self.send_wakers.push(cx.waker().clone()) { + Ok(()) => Poll::Pending, + Err(concurrent_queue::PushError::Full(_)) => { + unreachable!("Send waker queue is unbounded") + } + Err(concurrent_queue::PushError::Closed(_)) => { + Poll::Ready(Err(anyhow!("Queue closed"))) + } + } + } + } + + async fn send_ready(&self) -> Result<()> { + std::future::poll_fn(|cx| self.poll_send_ready(cx)).await + } + + fn wake_senders(&self) { + while let Ok(waker) = self.send_wakers.pop() { + waker.wake(); } } @@ -2168,7 +2196,7 @@ impl RelayDatagramRecvQueue { fn poll_recv(&self, cx: &mut Context) -> Poll> { match self.queue.pop() { Ok(value) => { - self.send_waker.wake(); + self.wake_senders(); Poll::Ready(Ok(value)) } Err(concurrent_queue::PopError::Empty) => { @@ -2176,20 +2204,20 @@ impl RelayDatagramRecvQueue { match self.queue.pop() { Ok(value) => { - self.send_waker.wake(); self.recv_waker.take(); + self.wake_senders(); Poll::Ready(Ok(value)) } Err(concurrent_queue::PopError::Empty) => Poll::Pending, Err(concurrent_queue::PopError::Closed) => { self.recv_waker.take(); - self.send_waker.wake(); + self.wake_senders(); Poll::Ready(Err(anyhow!("Queue closed"))) } } } Err(concurrent_queue::PopError::Closed) => { - self.send_waker.wake(); + self.wake_senders(); Poll::Ready(Err(anyhow!("Queue closed"))) } } From ba9c167d5b9b64482b1df11d472cc9b3a66fe718 Mon Sep 17 00:00:00 2001 From: Frando Date: Fri, 16 May 2025 09:52:47 +0200 Subject: [PATCH 10/10] cleanups and docs --- iroh/src/magicsock/relay_actor.rs | 73 ++++++++++++++++++++----------- 1 file changed, 48 insertions(+), 25 deletions(-) diff --git a/iroh/src/magicsock/relay_actor.rs b/iroh/src/magicsock/relay_actor.rs index 6ae2f0c170b..bfa91c327e9 100644 --- a/iroh/src/magicsock/relay_actor.rs +++ b/iroh/src/magicsock/relay_actor.rs @@ -255,13 +255,14 @@ impl ActiveRelayActor { prio_inbox, inbox, relay_datagrams_send, - url, + url: url.clone(), relay_client_builder, is_home_relay: false, inactive_timeout: Box::pin(time::sleep(RELAY_INACTIVE_CLEANUP_TIME)), stop_token, metrics, receive_queue: ReceiveQueue { + relay_url: url, relay_datagrams_recv, relay_disco_recv, pending: None, @@ -454,6 +455,7 @@ impl ActiveRelayActor { } } } + _ = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {}, _ = &mut self.inactive_timeout, if !self.is_home_relay => { debug!(?RELAY_INACTIVE_CLEANUP_TIME, "Inactive, exiting."); break None; @@ -615,7 +617,11 @@ impl ActiveRelayActor { let fut = client_sink.send_all(&mut packet_stream); self.run_sending(fut, &mut state, &mut client_stream).await?; } - () = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {} + res = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => { + if let Err(err) = res { + break Err(err); + } + } msg = client_stream.next(), if !self.receive_queue.is_pending() => { let Some(msg) = msg else { break Err(anyhow!("Stream closed by server.")); @@ -659,12 +665,11 @@ impl ActiveRelayActor { .map(|p| *p != remote_node_id) .unwrap_or(true) { - // Avoid map lookup with high throughput single peer. + // Avoid map () = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {} lookup with high throughput single peer. state.last_packet_src = Some(remote_node_id); state.nodes_present.insert(remote_node_id); } - let packets = PacketSplitIter::new(self.url.clone(), remote_node_id, data); - self.receive_queue.queue_packets(packets); + self.receive_queue.queue_packets(remote_node_id, data); } ReceivedMessage::NodeGone(node_id) => { state.nodes_present.remove(&node_id); @@ -747,7 +752,11 @@ impl ActiveRelayActor { break Err(anyhow!("Ping timeout")); } // No need to read the inbox or datagrams to send. - () = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => {} + res = self.receive_queue.forward_pending(), if self.receive_queue.is_pending() => { + if let Err(err) = res { + break Err(err); + } + } msg = client_stream.next(), if !self.receive_queue.is_pending() => { let Some(msg) = msg else { break Err(anyhow!("Stream closed by server.")); @@ -769,6 +778,7 @@ impl ActiveRelayActor { #[derive(Debug)] struct ReceiveQueue { + relay_url: RelayUrl, /// Received relay packets that could not yet be forwarded to the magicsocket. pending: Option, /// Queue for received relay datagrams. @@ -794,8 +804,19 @@ impl ReceiveQueue { self.pending.is_some() } - fn queue_packets(&mut self, packets: PacketSplitIter) { - debug_assert!( + /// Send packets to their respective queues. + /// + /// If a queue is blocked, the packets that were not yet sent will be stored on [`Self`], + /// and [`Self::is_pending`] will return true. You then need to await [`Self::forward_pending`] + /// in a loop until [`Self::is_pending`] returns false again. Only then call [`Self::queue_packets`] + /// again. Otherwise this function will panic. + /// + /// ## Panics + /// + /// Panics if [`Self::is_pending`] returns `true`. + fn queue_packets(&mut self, remote_node_id: NodeId, data: Bytes) { + let packets = PacketSplitIter::new(self.relay_url.clone(), remote_node_id, data); + assert!( !self.is_pending(), "ReceiveQueue::queue_packets may not be called if is_pending() returns true" ); @@ -808,47 +829,49 @@ impl ReceiveQueue { /// to become unblocked. It will then forward the pending items, until a queue is blocked again. /// In that case, the remaining items will be stored and [`Self::is_pending`] returns true. /// + /// Returns an error if the queue we're blocked on is closed. + /// /// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed /// to either be sent into their respective queues or preserved here. - async fn forward_pending(&mut self) { + async fn forward_pending(&mut self) -> Result<()> { // We take a reference onto the inner value. // we're not `take`ing it here, because this would make the function not cancellation safe. let Some(ref pending) = self.pending else { - return; + return Ok(()); }; let disco_permit = match pending.blocked_on { RecvPath::Data => { - std::future::poll_fn(|cx| self.relay_datagrams_recv.poll_send_ready(cx)) - .await - .ok(); + // The data receive queue does not have permits, so we can only wait for free slots. + self.relay_datagrams_recv.send_ready().await?; None } RecvPath::Disco => { - let Ok(permit) = self.relay_disco_recv.clone().reserve_owned().await else { - return; - }; + // The disco receive channel has permits, so we can reserve a permit to use afterwards + // to send at least one item. + let permit = self.relay_disco_recv.clone().reserve_owned().await?; Some(permit) } }; + // We checked above that `self.pending` is not `None` so this `expect` is safe. let packets = self .pending .take() .expect("checked to be not empty") .packets; self.handle_packets(packets, disco_permit); + Ok(()) } fn handle_packets( &mut self, - mut packet_iter: PacketSplitIter, + mut packets: PacketSplitIter, mut disco_permit: Option>, ) { - let remote_node_id = packet_iter.remote_node_id(); - for datagram in &mut packet_iter { + let remote_node_id = packets.remote_node_id(); + for datagram in &mut packets { let Ok(datagram) = datagram else { warn!("Invalid packet split"); - self.pending = None; - return; + break; }; match crate::disco::source_and_box_bytes(&datagram.buf) { Some((source, sealed_box)) => { @@ -866,9 +889,9 @@ impl ReceiveQueue { permit.send(message); } else if let Err(err) = self.relay_disco_recv.try_send(message) { warn!("Relay disco receive queue blocked: {err}"); - packet_iter.push_front(datagram); + packets.push_front(datagram); self.pending = Some(PendingRecv { - packets: packet_iter, + packets, blocked_on: RecvPath::Disco, }); return; @@ -877,9 +900,9 @@ impl ReceiveQueue { None => { if let Err(err) = self.relay_datagrams_recv.try_send(datagram) { warn!("Relay data receive queue blocked: {err}"); - packet_iter.push_front(err.into_inner()); + packets.push_front(err.into_inner()); self.pending = Some(PendingRecv { - packets: packet_iter, + packets, blocked_on: RecvPath::Data, }); return;