Skip to content

Commit d228474

Browse files
author
yngrtc
committed
only forward RTP/RTCP when local_srtp_context is ready
1 parent d8dec8f commit d228474

File tree

3 files changed

+68
-11
lines changed

3 files changed

+68
-11
lines changed

src/endpoint/transport.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -125,4 +125,8 @@ impl Transport {
125125
pub(crate) fn association_handle_and_stream_id(&self) -> (Option<usize>, Option<u16>) {
126126
(self.association_handle, self.stream_id)
127127
}
128+
129+
pub(crate) fn is_local_srtp_context_ready(&self) -> bool {
130+
self.local_srtp_context.is_some()
131+
}
128132
}

src/handler/gateway.rs

Lines changed: 63 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use crate::messages::{
99
};
1010
use crate::server::states::ServerStates;
1111
use bytes::BytesMut;
12-
use log::{debug, warn};
12+
use log::{debug, info, trace, warn};
1313
use retty::channel::{Handler, InboundContext, InboundHandler, OutboundContext, OutboundHandler};
1414
use retty::transport::TransportContext;
1515
use shared::error::{Error, Result};
@@ -275,6 +275,12 @@ impl GatewayInbound {
275275
endpoint_id, four_tuple
276276
)))?;
277277
transport.set_association_handle_and_stream_id(association_handle, stream_id);
278+
info!(
279+
"{}/{}: data channel is ready for {:?}",
280+
session_id,
281+
endpoint_id,
282+
transport.four_tuple()
283+
);
278284
endpoint.set_renegotiation_needed(!new_transceivers.is_empty());
279285

280286
let (mids, transceivers) = endpoint.get_mut_mids_and_transceivers();
@@ -336,7 +342,7 @@ impl GatewayInbound {
336342
let answer_str =
337343
serde_json::to_string(&answer).map_err(|err| Error::Other(err.to_string()))?;
338344

339-
let peers = GatewayInbound::get_other_transport_contexts(
345+
let peers = GatewayInbound::get_other_datachannel_transport_contexts(
340346
server_states,
341347
&transport_context,
342348
)?;
@@ -396,10 +402,10 @@ impl GatewayInbound {
396402
) -> Result<Vec<TaggedMessageEvent>> {
397403
//TODO: Selective Forwarding RTP Packets
398404
let peers =
399-
GatewayInbound::get_other_transport_contexts(server_states, &transport_context)?;
405+
GatewayInbound::get_other_media_transport_contexts(server_states, &transport_context)?;
400406

401407
let mut outgoing_messages = Vec::with_capacity(peers.len());
402-
for (transport, _, _, _) in peers {
408+
for transport in peers {
403409
outgoing_messages.push(TaggedMessageEvent {
404410
now,
405411
transport,
@@ -418,10 +424,10 @@ impl GatewayInbound {
418424
) -> Result<Vec<TaggedMessageEvent>> {
419425
//TODO: Selective Forwarding RTCP Packets
420426
let peers =
421-
GatewayInbound::get_other_transport_contexts(server_states, &transport_context)?;
427+
GatewayInbound::get_other_media_transport_contexts(server_states, &transport_context)?;
422428

423429
let mut outgoing_messages = Vec::with_capacity(peers.len());
424-
for (transport, _, _, _) in peers {
430+
for transport in peers {
425431
outgoing_messages.push(TaggedMessageEvent {
426432
now,
427433
transport,
@@ -483,7 +489,7 @@ impl GatewayInbound {
483489
}
484490
}
485491

486-
fn get_other_transport_contexts(
492+
fn get_other_datachannel_transport_contexts(
487493
server_states: &mut ServerStates,
488494
transport_context: &TransportContext,
489495
) -> Result<Vec<(TransportContext, usize, u16, bool)>> {
@@ -518,9 +524,56 @@ impl GatewayInbound {
518524
other_endpoint.is_renegotiation_needed(),
519525
));
520526
} else {
521-
warn!(
522-
"session id {}/endpoint id {}'s data channel is not ready",
523-
session_id, endpoint_id
527+
// data channel is not ready yet for other_endpoint_id's other_four_tuple.
528+
// this transport just joins, but data channel is still setup
529+
trace!(
530+
"{}/{}'s data channel is not ready yet for {:?} since it is still setup",
531+
session_id,
532+
other_endpoint_id,
533+
other_four_tuple,
534+
);
535+
}
536+
}
537+
}
538+
}
539+
Ok(peers)
540+
}
541+
542+
fn get_other_media_transport_contexts(
543+
server_states: &mut ServerStates,
544+
transport_context: &TransportContext,
545+
) -> Result<Vec<TransportContext>> {
546+
let four_tuple = transport_context.into();
547+
let (session_id, endpoint_id) = server_states
548+
.find_endpoint(&four_tuple)
549+
.ok_or(Error::ErrClientTransportNotSet)?;
550+
let session = server_states
551+
.get_session(&session_id)
552+
.ok_or(Error::Other(format!(
553+
"can't find session id {}",
554+
session_id
555+
)))?;
556+
557+
let mut peers = vec![];
558+
let endpoints = session.get_endpoints();
559+
for (&other_endpoint_id, other_endpoint) in endpoints.iter() {
560+
if other_endpoint_id != endpoint_id {
561+
let transports = other_endpoint.get_transports();
562+
for (other_four_tuple, other_transport) in transports.iter() {
563+
if other_transport.is_local_srtp_context_ready() {
564+
peers.push(TransportContext {
565+
local_addr: other_four_tuple.local_addr,
566+
peer_addr: other_four_tuple.peer_addr,
567+
ecn: transport_context.ecn,
568+
});
569+
} else {
570+
// local_srtp_context is not ready yet for other_endpoint_id's other_four_tuple.
571+
// this transport just joins, but local_srtp_context is still setup
572+
trace!(
573+
"{}/{}'s local_srtp_context is not ready yet for {:?} since it is still setup",
574+
session_id,
575+
other_endpoint_id,
576+
other_four_tuple,
524577
);
525578
}
526579
}

src/server/states.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ impl ServerStates {
182182
) {
183183
self.endpoints.insert(four_tuple, (session_id, endpoint_id));
184184
info!(
185-
"endpoint {}/{} is connected via {:?}",
185+
"{}/{} is connected via {:?}",
186186
session_id, endpoint_id, four_tuple
187187
)
188188
}

0 commit comments

Comments
 (0)