Skip to content

Commit acfeda5

Browse files
committed
Send ECN bits and use stride instead of custom split protocol
1 parent 6ea0b26 commit acfeda5

File tree

10 files changed

+413
-425
lines changed

10 files changed

+413
-425
lines changed

iroh-relay/src/client/conn.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use crate::client::streams::{MaybeTlsStream, ProxyStream};
1919
use crate::{
2020
protos::{
2121
handshake,
22-
relay::{ClientToRelayMsg, Error as ProtoError, RelayToClientMsg},
22+
relay::{ClientToRelayMsg, Error as ProtoError, RelayToClientMsg, MAX_PAYLOAD_SIZE},
2323
streams::WsBytesFramed,
2424
},
2525
MAX_PACKET_SIZE,
@@ -143,9 +143,9 @@ impl Sink<ClientToRelayMsg> for Conn {
143143
}
144144

145145
fn start_send(mut self: Pin<&mut Self>, frame: ClientToRelayMsg) -> Result<(), Self::Error> {
146-
if let ClientToRelayMsg::SendPacket { .. } = &frame {
147-
let size = frame.encoded_len();
148-
snafu::ensure!(size <= MAX_PACKET_SIZE, ExceedsMaxPacketSizeSnafu { size });
146+
if let ClientToRelayMsg::Datagrams { datagrams, .. } = &frame {
147+
let size = datagrams.contents.len();
148+
snafu::ensure!(size <= MAX_PAYLOAD_SIZE, ExceedsMaxPacketSizeSnafu { size });
149149
}
150150

151151
Pin::new(&mut self.conn)

iroh-relay/src/protos/common.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,9 @@ pub enum FrameType {
2424
/// The server frame type for authentication denial
2525
ServerDeniesAuth = 3,
2626
/// 32B dest pub key + ECN byte + segment size u16 + datagrams contents
27-
SendPacket = 4,
27+
ClientToRelayDatagrams = 4,
2828
/// 32B src pub key + ECN byte + segment size u16 + datagrams contents
29-
RecvPacket = 6,
29+
RelayToClientDatagrams = 6,
3030
/// Sent from server to client to signal that a previous sender is no longer connected.
3131
///
3232
/// That is, if A sent to B, and then if A disconnects, the server sends `FrameType::PeerGone`

iroh-relay/src/protos/relay.rs

Lines changed: 164 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,12 @@ use crate::KeyCache;
2121
/// including its on-wire framing overhead)
2222
pub const MAX_PACKET_SIZE: usize = 64 * 1024;
2323

24+
/// Maximum size a datagram payload is allowed to be.
25+
///
26+
/// This is [`MAX_PACKET_SIZE`] minus the length of an encoded public key minus 3 bytes,
27+
/// one for ECN, and two for the segment size.
28+
pub const MAX_PAYLOAD_SIZE: usize = MAX_PACKET_SIZE - NodeId::LENGTH - 3;
29+
2430
/// The maximum frame size.
2531
///
2632
/// This is also the minimum burst size that a rate-limiter has to accept.
@@ -74,12 +80,11 @@ pub enum Error {
7480
#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
7581
pub enum RelayToClientMsg {
7682
/// Represents datagrams sent from relays (originally sent to them by another client).
77-
ReceivedPacket {
83+
Datagrams {
7884
/// The [`NodeId`] of the original sender.
79-
src_key: NodeId,
80-
/// The received packet bytes.
81-
#[debug(skip)]
82-
content: Bytes,
85+
remote_node_id: NodeId,
86+
/// The datagrams and related metadata.
87+
datagrams: Datagrams,
8388
},
8489
/// Indicates that the client identified by the underlying public key had previously sent you a
8590
/// packet but has now disconnected from the relay.
@@ -123,19 +128,85 @@ pub enum ClientToRelayMsg {
123128
/// with the payload sent previously in the ping.
124129
Pong([u8; 8]),
125130
/// Request from the client to relay datagrams to given remote node.
126-
SendPacket {
131+
Datagrams {
127132
/// The remote node to relay to.
128-
dst_key: NodeId,
133+
dst_node_id: NodeId,
129134
/// The datagrams and related metadata to relay.
130-
packet: Bytes,
135+
datagrams: Datagrams,
131136
},
132137
}
133138

139+
/// One or multiple datagrams being transferred via the relay.
140+
///
141+
/// This type is modeled after [`quinn_proto::Transmit`]
142+
/// (or even more similarly `quinn_udp::Transmit`, but we don't depend on that library here).
143+
#[derive(derive_more::Debug, Clone, PartialEq, Eq)]
144+
pub struct Datagrams {
145+
/// Explicit congestion notification bits
146+
pub ecn: Option<quinn_proto::EcnCodepoint>,
147+
/// The segment size if this transmission contains multiple datagrams.
148+
/// This is `None` if the transmit only contains a single datagram
149+
pub segment_size: Option<u16>,
150+
/// The contents of the datagram(s)
151+
#[debug(skip)]
152+
pub contents: Bytes,
153+
}
154+
155+
impl<T: AsRef<[u8]>> From<T> for Datagrams {
156+
fn from(bytes: T) -> Self {
157+
Self {
158+
ecn: None,
159+
segment_size: None,
160+
contents: Bytes::copy_from_slice(bytes.as_ref()),
161+
}
162+
}
163+
}
164+
165+
impl Datagrams {
166+
fn write_to<O: BufMut>(&self, mut dst: O) -> O {
167+
let ecn = self.ecn.map_or(0, |ecn| ecn as u8);
168+
let segment_size = self.segment_size.unwrap_or_default();
169+
dst.put_u8(ecn);
170+
dst.put_u16(segment_size);
171+
dst.put(self.contents.as_ref());
172+
dst
173+
}
174+
175+
fn encoded_len(&self) -> usize {
176+
1 // ECN byte
177+
+ 2 // segment size
178+
+ self.contents.len()
179+
}
180+
181+
fn from_bytes(bytes: Bytes) -> Result<Self, Error> {
182+
// 1 bytes ECN, 2 bytes segment size
183+
snafu::ensure!(bytes.len() > 3, InvalidFrameSnafu);
184+
185+
let ecn_byte = bytes[0];
186+
let ecn = quinn_proto::EcnCodepoint::from_bits(ecn_byte);
187+
188+
let segment_size = u16::from_be_bytes(bytes[1..3].try_into().expect("length checked"));
189+
let segment_size = if segment_size == 0 {
190+
None
191+
} else {
192+
Some(segment_size)
193+
};
194+
195+
let contents = bytes.slice(3..);
196+
197+
Ok(Self {
198+
ecn,
199+
segment_size,
200+
contents,
201+
})
202+
}
203+
}
204+
134205
impl RelayToClientMsg {
135206
/// Returns this frame's corresponding frame type.
136207
pub fn typ(&self) -> FrameType {
137208
match self {
138-
Self::ReceivedPacket { .. } => FrameType::RecvPacket,
209+
Self::Datagrams { .. } => FrameType::RelayToClientDatagrams,
139210
Self::NodeGone { .. } => FrameType::NodeGone,
140211
Self::Ping { .. } => FrameType::Ping,
141212
Self::Pong { .. } => FrameType::Pong,
@@ -156,12 +227,12 @@ impl RelayToClientMsg {
156227
pub(crate) fn write_to<O: BufMut>(&self, mut dst: O) -> O {
157228
dst = self.typ().write_to(dst);
158229
match self {
159-
Self::ReceivedPacket {
160-
src_key: remote_node_id,
161-
content,
230+
Self::Datagrams {
231+
remote_node_id,
232+
datagrams,
162233
} => {
163234
dst.put(remote_node_id.as_ref());
164-
dst.put(content.as_ref());
235+
dst = datagrams.write_to(dst);
165236
}
166237
Self::NodeGone(node_id) => {
167238
dst.put(node_id.as_ref());
@@ -189,9 +260,9 @@ impl RelayToClientMsg {
189260
#[cfg(feature = "server")]
190261
pub(crate) fn encoded_len(&self) -> usize {
191262
let payload_len = match self {
192-
Self::ReceivedPacket { content, .. } => {
263+
Self::Datagrams { datagrams, .. } => {
193264
32 // nodeid
194-
+ content.len()
265+
+ datagrams.encoded_len()
195266
}
196267
Self::NodeGone(_) => 32,
197268
Self::Ping(_) | Self::Pong(_) => 8,
@@ -218,14 +289,17 @@ impl RelayToClientMsg {
218289
);
219290

220291
let res = match frame_type {
221-
FrameType::RecvPacket => {
292+
FrameType::RelayToClientDatagrams => {
222293
snafu::ensure!(content.len() >= NodeId::LENGTH, InvalidFrameSnafu);
223294

224-
let src_key = cache
295+
let remote_node_id = cache
225296
.key_from_slice(&content[..NodeId::LENGTH])
226297
.context(InvalidPublicKeySnafu)?;
227-
let content = content.slice(NodeId::LENGTH..);
228-
Self::ReceivedPacket { src_key, content }
298+
let datagrams = Datagrams::from_bytes(content.slice(NodeId::LENGTH..))?;
299+
Self::Datagrams {
300+
remote_node_id,
301+
datagrams,
302+
}
229303
}
230304
FrameType::NodeGone => {
231305
snafu::ensure!(content.len() == NodeId::LENGTH, InvalidFrameSnafu);
@@ -282,7 +356,7 @@ impl RelayToClientMsg {
282356
impl ClientToRelayMsg {
283357
pub(crate) fn typ(&self) -> FrameType {
284358
match self {
285-
Self::SendPacket { .. } => FrameType::SendPacket,
359+
Self::Datagrams { .. } => FrameType::ClientToRelayDatagrams,
286360
Self::Ping { .. } => FrameType::Ping,
287361
Self::Pong { .. } => FrameType::Pong,
288362
}
@@ -298,9 +372,12 @@ impl ClientToRelayMsg {
298372
pub(crate) fn write_to<O: BufMut>(&self, mut dst: O) -> O {
299373
dst = self.typ().write_to(dst);
300374
match self {
301-
Self::SendPacket { dst_key, packet } => {
302-
dst.put(dst_key.as_ref());
303-
dst.put(packet.as_ref());
375+
Self::Datagrams {
376+
dst_node_id,
377+
datagrams,
378+
} => {
379+
dst.put(dst_node_id.as_ref());
380+
dst = datagrams.write_to(dst);
304381
}
305382
Self::Ping(data) => {
306383
dst.put(&data[..]);
@@ -315,9 +392,9 @@ impl ClientToRelayMsg {
315392
pub(crate) fn encoded_len(&self) -> usize {
316393
let payload_len = match self {
317394
Self::Ping(_) | Self::Pong(_) => 8,
318-
Self::SendPacket { packet, .. } => {
395+
Self::Datagrams { datagrams, .. } => {
319396
32 // node id
320-
+ packet.len()
397+
+ datagrams.encoded_len()
321398
}
322399
};
323400
1 // frame type (all frame types currently encode as 1 byte varint)
@@ -338,12 +415,15 @@ impl ClientToRelayMsg {
338415
);
339416

340417
let res = match frame_type {
341-
FrameType::SendPacket => {
342-
let dst_key = cache
418+
FrameType::ClientToRelayDatagrams => {
419+
let dst_node_id = cache
343420
.key_from_slice(&content[..NodeId::LENGTH])
344421
.context(InvalidPublicKeySnafu)?;
345-
let packet = content.slice(NodeId::LENGTH..);
346-
Self::SendPacket { dst_key, packet }
422+
let datagrams = Datagrams::from_bytes(content.slice(NodeId::LENGTH..))?;
423+
Self::Datagrams {
424+
dst_node_id,
425+
datagrams,
426+
}
347427
}
348428
FrameType::Ping => {
349429
snafu::ensure!(content.len() == 8, InvalidFrameSnafu);
@@ -419,9 +499,13 @@ mod tests {
419499
"0a 2a 2a 2a 2a 2a 2a 2a 2a",
420500
),
421501
(
422-
RelayToClientMsg::ReceivedPacket {
423-
src_key: client_key.public(),
424-
content: "Hello World!".into(),
502+
RelayToClientMsg::Datagrams {
503+
remote_node_id: client_key.public(),
504+
datagrams: Datagrams {
505+
ecn: Some(quinn::EcnCodepoint::Ce),
506+
segment_size: Some(6),
507+
contents: "Hello World!".into(),
508+
},
425509
}
426510
.write_to(Vec::new()),
427511
"06 19 7f 6b 23 e1 6c 85 32 c6 ab c8 38 fa cd 5e
@@ -455,9 +539,13 @@ mod tests {
455539
"0a 2a 2a 2a 2a 2a 2a 2a 2a",
456540
),
457541
(
458-
ClientToRelayMsg::SendPacket {
459-
dst_key: client_key.public(),
460-
packet: "Goodbye!".into(),
542+
ClientToRelayMsg::Datagrams {
543+
dst_node_id: client_key.public(),
544+
datagrams: Datagrams {
545+
ecn: Some(quinn::EcnCodepoint::Ce),
546+
segment_size: Some(6),
547+
contents: "Hello World!".into(),
548+
},
461549
}
462550
.write_to(Vec::new()),
463551
"04 19 7f 6b 23 e1 6c 85 32 c6 ab c8 38 fa cd 5e
@@ -485,22 +573,42 @@ mod proptests {
485573
secret_key().prop_map(|key| key.public())
486574
}
487575

488-
/// Generates random data, up to the maximum packet size minus the given number of bytes
489-
fn data(consumed: usize) -> impl Strategy<Value = Bytes> {
490-
let len = MAX_PACKET_SIZE - consumed;
491-
prop::collection::vec(any::<u8>(), 0..len).prop_map(Bytes::from)
576+
fn ecn() -> impl Strategy<Value = Option<quinn_proto::EcnCodepoint>> {
577+
(0..=3).prop_map(|n| match n {
578+
1 => Some(quinn_proto::EcnCodepoint::Ce),
579+
2 => Some(quinn_proto::EcnCodepoint::Ect0),
580+
3 => Some(quinn_proto::EcnCodepoint::Ect1),
581+
_ => None,
582+
})
583+
}
584+
585+
fn datagrams() -> impl Strategy<Value = Datagrams> {
586+
(
587+
ecn(),
588+
prop::option::of(MAX_PAYLOAD_SIZE / 20..MAX_PAYLOAD_SIZE),
589+
prop::collection::vec(any::<u8>(), 0..MAX_PAYLOAD_SIZE),
590+
)
591+
.prop_map(|(ecn, segment_size, data)| Datagrams {
592+
ecn,
593+
segment_size: segment_size.map(|ss| std::cmp::min(data.len(), ss) as u16),
594+
contents: Bytes::from(data),
595+
})
492596
}
493597

494598
/// Generates a random valid frame
495599
fn server_client_frame() -> impl Strategy<Value = RelayToClientMsg> {
496-
let recv_packet = (key(), data(32))
497-
.prop_map(|(src_key, content)| RelayToClientMsg::ReceivedPacket { src_key, content });
498-
let node_gone = key().prop_map(|node_id| RelayToClientMsg::NodeGone(node_id));
600+
let recv_packet = (key(), datagrams()).prop_map(|(remote_node_id, datagrams)| {
601+
RelayToClientMsg::Datagrams {
602+
remote_node_id,
603+
datagrams,
604+
}
605+
});
606+
let node_gone = key().prop_map(RelayToClientMsg::NodeGone);
499607
let ping = prop::array::uniform8(any::<u8>()).prop_map(RelayToClientMsg::Ping);
500608
let pong = prop::array::uniform8(any::<u8>()).prop_map(RelayToClientMsg::Pong);
501609
let health = ".{0,65536}"
502-
.prop_filter("exceeds max payload size", |s| {
503-
s.len() < 65536 // a single unicode character can match a regex "." but take up multiple bytes
610+
.prop_filter("exceeds MAX_PAYLOAD_SIZE", |s| {
611+
s.len() < MAX_PAYLOAD_SIZE // a single unicode character can match a regex "." but take up multiple bytes
504612
})
505613
.prop_map(|problem| RelayToClientMsg::Health { problem });
506614
let restarting = (any::<u32>(), any::<u32>()).prop_map(|(reconnect_in, try_for)| {
@@ -513,8 +621,11 @@ mod proptests {
513621
}
514622

515623
fn client_server_frame() -> impl Strategy<Value = ClientToRelayMsg> {
516-
let send_packet = (key(), data(32))
517-
.prop_map(|(dst_key, packet)| ClientToRelayMsg::SendPacket { dst_key, packet });
624+
let send_packet =
625+
(key(), datagrams()).prop_map(|(dst_node_id, datagrams)| ClientToRelayMsg::Datagrams {
626+
dst_node_id,
627+
datagrams,
628+
});
518629
let ping = prop::array::uniform8(any::<u8>()).prop_map(ClientToRelayMsg::Ping);
519630
let pong = prop::array::uniform8(any::<u8>()).prop_map(ClientToRelayMsg::Pong);
520631
prop_oneof![send_packet, ping, pong]
@@ -548,5 +659,12 @@ mod proptests {
548659
let actual_encoded_len = frame.to_bytes().len();
549660
prop_assert_eq!(claimed_encoded_len, actual_encoded_len);
550661
}
662+
663+
#[test]
664+
fn datagrams_encoded_len(datagrams in datagrams()) {
665+
let claimed_encoded_len = datagrams.encoded_len();
666+
let actual_encoded_len = datagrams.write_to(Vec::new()).len();
667+
prop_assert_eq!(claimed_encoded_len, actual_encoded_len);
668+
}
551669
}
552670
}

0 commit comments

Comments
 (0)