@@ -136,10 +136,6 @@ struct ActiveRelayActor {
136
136
prio_inbox : mpsc:: Receiver < ActiveRelayPrioMessage > ,
137
137
/// Inbox for messages which involve sending to the relay server.
138
138
inbox : mpsc:: Receiver < ActiveRelayMessage > ,
139
- /// Queue for received relay datagrams.
140
- relay_datagrams_recv : Arc < RelayDatagramRecvQueue > ,
141
- /// Queue for received relay disco packets.
142
- relay_disco_recv : mpsc:: Sender < RelayDiscoMessage > ,
143
139
/// Channel on which we queue packets to send to the relay.
144
140
relay_datagrams_send : mpsc:: Receiver < RelaySendItem > ,
145
141
@@ -162,20 +158,7 @@ struct ActiveRelayActor {
162
158
/// Token indicating the [`ActiveRelayActor`] should stop.
163
159
stop_token : CancellationToken ,
164
160
metrics : Arc < MagicsockMetrics > ,
165
- /// Received relay packets that could not yet be forwarded to the magicsocket.
166
- pending_received : Option < PendingRecv > ,
167
- }
168
-
169
- #[ derive( Debug ) ]
170
- struct PendingRecv {
171
- packet_iter : PacketSplitIter ,
172
- blocked_on : RecvPath ,
173
- }
174
-
175
- #[ derive( Debug ) ]
176
- enum RecvPath {
177
- Data ,
178
- Disco ,
161
+ receive_queue : ReceiveQueue ,
179
162
}
180
163
181
164
#[ derive( Debug ) ]
@@ -271,16 +254,18 @@ impl ActiveRelayActor {
271
254
ActiveRelayActor {
272
255
prio_inbox,
273
256
inbox,
274
- relay_datagrams_recv,
275
257
relay_datagrams_send,
276
- relay_disco_recv,
277
258
url,
278
259
relay_client_builder,
279
260
is_home_relay : false ,
280
261
inactive_timeout : Box :: pin ( time:: sleep ( RELAY_INACTIVE_CLEANUP_TIME ) ) ,
281
262
stop_token,
282
263
metrics,
283
- pending_received : None ,
264
+ receive_queue : ReceiveQueue {
265
+ relay_datagrams_recv,
266
+ relay_disco_recv,
267
+ pending : None ,
268
+ } ,
284
269
}
285
270
}
286
271
@@ -630,8 +615,8 @@ impl ActiveRelayActor {
630
615
let fut = client_sink. send_all( & mut packet_stream) ;
631
616
self . run_sending( fut, & mut state, & mut client_stream) . await ?;
632
617
}
633
- _ = forward_pending ( & mut self . pending_received , & self . relay_datagrams_recv , & mut self . relay_disco_recv ) , if self . pending_received . is_some ( ) => { }
634
- msg = client_stream. next( ) , if self . pending_received . is_none ( ) => {
618
+ ( ) = self . receive_queue . forward_pending ( ) , if self . receive_queue . is_pending ( ) => { }
619
+ msg = client_stream. next( ) , if ! self . receive_queue . is_pending ( ) => {
635
620
let Some ( msg) = msg else {
636
621
break Err ( anyhow!( "Stream closed by server." ) ) ;
637
622
} ;
@@ -678,15 +663,8 @@ impl ActiveRelayActor {
678
663
state. last_packet_src = Some ( remote_node_id) ;
679
664
state. nodes_present . insert ( remote_node_id) ;
680
665
}
681
- let packet_iter = PacketSplitIter :: new ( self . url . clone ( ) , remote_node_id, data) ;
682
- if let Some ( pending) = handle_received_packet_iter (
683
- packet_iter,
684
- None ,
685
- & self . relay_datagrams_recv ,
686
- & mut self . relay_disco_recv ,
687
- ) {
688
- self . pending_received = Some ( pending) ;
689
- }
666
+ let packets = PacketSplitIter :: new ( self . url . clone ( ) , remote_node_id, data) ;
667
+ self . receive_queue . queue_packets ( packets) ;
690
668
}
691
669
ReceivedMessage :: NodeGone ( node_id) => {
692
670
state. nodes_present . remove ( & node_id) ;
@@ -769,8 +747,8 @@ impl ActiveRelayActor {
769
747
break Err ( anyhow!( "Ping timeout" ) ) ;
770
748
}
771
749
// No need to read the inbox or datagrams to send.
772
- _ = forward_pending ( & mut self . pending_received , & self . relay_datagrams_recv , & mut self . relay_disco_recv ) , if self . pending_received . is_some ( ) => { }
773
- msg = client_stream. next( ) , if self . pending_received . is_none ( ) => {
750
+ ( ) = self . receive_queue . forward_pending ( ) , if self . receive_queue . is_pending ( ) => { }
751
+ msg = client_stream. next( ) , if ! self . receive_queue . is_pending ( ) => {
774
752
let Some ( msg) = msg else {
775
753
break Err ( anyhow!( "Stream closed by server." ) ) ;
776
754
} ;
@@ -789,103 +767,128 @@ impl ActiveRelayActor {
789
767
}
790
768
}
791
769
792
- /// Forward pending received packets to their queues.
793
- ///
794
- /// If `maybe_pending` is not empty, this will wait for the path the last received item
795
- /// is blocked on (via [`PendingRecv::blocked_on`]) to become unblocked. It will then forward
796
- /// the pending items, until a queue is blocked again. In that case, the remaining items will
797
- /// be put back into `maybe_pending`. If all items could be sent, `maybe_pending` will be set
798
- /// to `None`.
799
- ///
800
- /// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed
801
- /// to either be sent into their respective queues, or are still in `maybe_pending`.
802
- async fn forward_pending (
803
- maybe_pending : & mut Option < PendingRecv > ,
804
- relay_datagrams_recv : & RelayDatagramRecvQueue ,
805
- relay_disco_recv : & mut mpsc:: Sender < RelayDiscoMessage > ,
806
- ) {
807
- // We take a mutable reference onto the inner value.
808
- // we're not `take`ing it here, because this would make the function not cancellation safe.
809
- let Some ( ref mut pending) = maybe_pending else {
810
- return ;
811
- } ;
812
- let disco_permit = match pending. blocked_on {
813
- RecvPath :: Data => {
814
- std:: future:: poll_fn ( |cx| relay_datagrams_recv. poll_send_ready ( cx) )
815
- . await
816
- . ok ( ) ;
817
- None
818
- }
819
- RecvPath :: Disco => {
820
- let Ok ( permit) = relay_disco_recv. clone ( ) . reserve_owned ( ) . await else {
821
- return ;
822
- } ;
823
- Some ( permit)
824
- }
825
- } ;
826
- // We now take the inner value by value. it is cancellation safe here because
827
- // no further `await`s occur after here.
828
- // The unwrap is guaranteed to be safe because we checked above that it is not none.
829
- #[ allow( clippy:: unwrap_used, reason = "checked above" ) ]
830
- let pending = maybe_pending. take ( ) . unwrap ( ) ;
831
- if let Some ( pending) = handle_received_packet_iter (
832
- pending. packet_iter ,
833
- disco_permit,
834
- relay_datagrams_recv,
835
- relay_disco_recv,
836
- ) {
837
- * maybe_pending = Some ( pending) ;
838
- }
770
+ #[ derive( Debug ) ]
771
+ struct ReceiveQueue {
772
+ /// Received relay packets that could not yet be forwarded to the magicsocket.
773
+ pending : Option < PendingRecv > ,
774
+ /// Queue for received relay datagrams.
775
+ relay_datagrams_recv : Arc < RelayDatagramRecvQueue > ,
776
+ /// Queue for received relay disco packets.
777
+ relay_disco_recv : mpsc:: Sender < RelayDiscoMessage > ,
778
+ }
779
+
780
+ #[ derive( Debug ) ]
781
+ struct PendingRecv {
782
+ packets : PacketSplitIter ,
783
+ blocked_on : RecvPath ,
784
+ }
785
+
786
+ #[ derive( Debug ) ]
787
+ enum RecvPath {
788
+ Data ,
789
+ Disco ,
839
790
}
840
791
841
- fn handle_received_packet_iter (
842
- mut packet_iter : PacketSplitIter ,
843
- mut disco_permit : Option < OwnedPermit < RelayDiscoMessage > > ,
844
- relay_datagrams_recv : & RelayDatagramRecvQueue ,
845
- relay_disco_recv : & mut mpsc:: Sender < RelayDiscoMessage > ,
846
- ) -> Option < PendingRecv > {
847
- let remote_node_id = packet_iter. remote_node_id ( ) ;
848
- for datagram in & mut packet_iter {
849
- let Ok ( datagram) = datagram else {
850
- warn ! ( "Invalid packet split" ) ;
851
- return None ;
792
+ impl ReceiveQueue {
793
+ fn is_pending ( & self ) -> bool {
794
+ self . pending . is_some ( )
795
+ }
796
+
797
+ fn queue_packets ( & mut self , packets : PacketSplitIter ) {
798
+ debug_assert ! (
799
+ !self . is_pending( ) ,
800
+ "ReceiveQueue::queue_packets may not be called if is_pending() returns true"
801
+ ) ;
802
+ self . handle_packets ( packets, None ) ;
803
+ }
804
+
805
+ /// Forward pending received packets to their queues.
806
+ ///
807
+ /// This will wait for the path the last received item is blocked on (via [`PendingRecv::blocked_on`])
808
+ /// to become unblocked. It will then forward the pending items, until a queue is blocked again.
809
+ /// In that case, the remaining items will be stored and [`Self::is_pending`] returns true.
810
+ ///
811
+ /// This function is cancellation-safe: If the future is dropped at any point, all items are guaranteed
812
+ /// to either be sent into their respective queues or preserved here.
813
+ async fn forward_pending ( & mut self ) {
814
+ // We take a reference onto the inner value.
815
+ // we're not `take`ing it here, because this would make the function not cancellation safe.
816
+ let Some ( ref pending) = self . pending else {
817
+ return ;
852
818
} ;
853
- match crate :: disco:: source_and_box_bytes ( & datagram. buf ) {
854
- Some ( ( source, sealed_box) ) => {
855
- if remote_node_id != source {
856
- // TODO: return here?
857
- warn ! ( "Received relay disco message from connection for {}, but with message from {}" , remote_node_id. fmt_short( ) , source. fmt_short( ) ) ;
858
- }
859
- let message = RelayDiscoMessage {
860
- source,
861
- sealed_box,
862
- relay_url : datagram. url . clone ( ) ,
863
- relay_remote_node_id : datagram. src ,
819
+ let disco_permit = match pending. blocked_on {
820
+ RecvPath :: Data => {
821
+ std:: future:: poll_fn ( |cx| self . relay_datagrams_recv . poll_send_ready ( cx) )
822
+ . await
823
+ . ok ( ) ;
824
+ None
825
+ }
826
+ RecvPath :: Disco => {
827
+ let Ok ( permit) = self . relay_disco_recv . clone ( ) . reserve_owned ( ) . await else {
828
+ return ;
864
829
} ;
865
- if let Some ( permit) = disco_permit. take ( ) {
866
- permit. send ( message) ;
867
- } else if let Err ( err) = relay_disco_recv. try_send ( message) {
868
- warn ! ( "Dropping received relay disco packet: {err:#}" ) ;
869
- packet_iter. push_front ( datagram) ;
870
- return Some ( PendingRecv {
871
- packet_iter,
872
- blocked_on : RecvPath :: Disco ,
873
- } ) ;
874
- }
830
+ Some ( permit)
875
831
}
876
- None => {
877
- if let Err ( err) = relay_datagrams_recv. try_send ( datagram) {
878
- warn ! ( "Dropping received relay data packet: {err:#}" ) ;
879
- packet_iter. push_front ( err. into_inner ( ) ) ;
880
- return Some ( PendingRecv {
881
- packet_iter,
882
- blocked_on : RecvPath :: Data ,
883
- } ) ;
832
+ } ;
833
+ let packets = self
834
+ . pending
835
+ . take ( )
836
+ . expect ( "checked to be not empty" )
837
+ . packets ;
838
+ self . handle_packets ( packets, disco_permit) ;
839
+ }
840
+
841
+ fn handle_packets (
842
+ & mut self ,
843
+ mut packet_iter : PacketSplitIter ,
844
+ mut disco_permit : Option < OwnedPermit < RelayDiscoMessage > > ,
845
+ ) {
846
+ let remote_node_id = packet_iter. remote_node_id ( ) ;
847
+ for datagram in & mut packet_iter {
848
+ let Ok ( datagram) = datagram else {
849
+ warn ! ( "Invalid packet split" ) ;
850
+ self . pending = None ;
851
+ return ;
852
+ } ;
853
+ match crate :: disco:: source_and_box_bytes ( & datagram. buf ) {
854
+ Some ( ( source, sealed_box) ) => {
855
+ if remote_node_id != source {
856
+ // TODO: return here?
857
+ warn ! ( "Received relay disco message from connection for {}, but with message from {}" , remote_node_id. fmt_short( ) , source. fmt_short( ) ) ;
858
+ }
859
+ let message = RelayDiscoMessage {
860
+ source,
861
+ sealed_box,
862
+ relay_url : datagram. url . clone ( ) ,
863
+ relay_remote_node_id : datagram. src ,
864
+ } ;
865
+ if let Some ( permit) = disco_permit. take ( ) {
866
+ permit. send ( message) ;
867
+ } else if let Err ( err) = self . relay_disco_recv . try_send ( message) {
868
+ warn ! ( "Relay disco receive queue blocked: {err}" ) ;
869
+ packet_iter. push_front ( datagram) ;
870
+ self . pending = Some ( PendingRecv {
871
+ packets : packet_iter,
872
+ blocked_on : RecvPath :: Disco ,
873
+ } ) ;
874
+ return ;
875
+ }
876
+ }
877
+ None => {
878
+ if let Err ( err) = self . relay_datagrams_recv . try_send ( datagram) {
879
+ warn ! ( "Relay data receive queue blocked: {err}" ) ;
880
+ packet_iter. push_front ( err. into_inner ( ) ) ;
881
+ self . pending = Some ( PendingRecv {
882
+ packets : packet_iter,
883
+ blocked_on : RecvPath :: Data ,
884
+ } ) ;
885
+ return ;
886
+ }
884
887
}
885
888
}
886
889
}
890
+ self . pending = None ;
887
891
}
888
- None
889
892
}
890
893
891
894
/// Shared state when the [`ActiveRelayActor`] is connected to a relay server.
0 commit comments