Skip to content

Commit d6c7c0b

Browse files
author
yngrtc
committed
fix-interceptor-poll_write-bug
1 parent 9bcbede commit d6c7c0b

File tree

4 files changed

+18
-7
lines changed

4 files changed

+18
-7
lines changed

examples/sync_signal/mod.rs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ pub fn sync_run(
117117
}
118118
};
119119

120-
write_socket_output(&socket, &pipeline);
120+
write_socket_output(&socket, &pipeline)?;
121121

122122
// Spawn new incoming signal message from the signaling server thread.
123123
if let Ok(signal_message) = rx.try_recv() {
@@ -161,12 +161,13 @@ pub fn sync_run(
161161
fn write_socket_output(
162162
socket: &UdpSocket,
163163
pipeline: &Rc<Pipeline<TaggedBytesMut, TaggedBytesMut>>,
164-
) {
164+
) -> anyhow::Result<()>{
165165
while let Some(transmit) = pipeline.poll_transmit() {
166166
socket
167-
.send_to(&transmit.message, transmit.transport.peer_addr)
168-
.expect("sending UDP data");
167+
.send_to(&transmit.message, transmit.transport.peer_addr)?;
169168
}
169+
170+
Ok(())
170171
}
171172

172173
fn read_socket_input(socket: &UdpSocket, buf: &mut [u8]) -> Option<TaggedBytesMut> {

src/handler/gateway.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -386,6 +386,8 @@ impl GatewayHandler {
386386
transport_context: TransportContext,
387387
rtp_packet: rtp::packet::Packet,
388388
) -> Result<Vec<TaggedMessageEvent>> {
389+
debug!("handle_rtp_message {}", transport_context.peer_addr);
390+
389391
//TODO: Selective Forwarding RTP Packets
390392
let peers =
391393
GatewayHandler::get_other_media_transport_contexts(server_states, &transport_context)?;
@@ -408,6 +410,8 @@ impl GatewayHandler {
408410
transport_context: TransportContext,
409411
rtcp_packets: Vec<Box<dyn rtcp::packet::Packet>>,
410412
) -> Result<Vec<TaggedMessageEvent>> {
413+
debug!("handle_rtcp_message {}", transport_context.peer_addr);
414+
411415
//TODO: Selective Forwarding RTCP Packets
412416
let peers =
413417
GatewayHandler::get_other_media_transport_contexts(server_states, &transport_context)?;

src/handler/interceptor.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ use crate::interceptor::InterceptorEvent;
22
use crate::messages::{MessageEvent, RTPMessageEvent, TaggedMessageEvent};
33
use crate::types::FourTuple;
44
use crate::ServerStates;
5-
use log::error;
5+
use log::{debug, error};
66
use retty::channel::{Context, Handler};
77
use shared::error::Result;
88
use std::cell::RefCell;
@@ -56,6 +56,7 @@ impl Handler for InterceptorHandler {
5656
for event in events {
5757
match event {
5858
InterceptorEvent::Inbound(inbound) => {
59+
debug!("interceptor forward Rtcp {:?}", msg.transport.peer_addr);
5960
ctx.fire_read(inbound);
6061
}
6162
InterceptorEvent::Outbound(outbound) => {
@@ -77,10 +78,12 @@ impl Handler for InterceptorHandler {
7778
if let MessageEvent::Rtp(RTPMessageEvent::Rtcp(_)) = &msg.message {
7879
// RTCP message read must end here in SFU case. If any rtcp packet needs to be forwarded to other Endpoints,
7980
// just add a new interceptor to forward it.
81+
debug!("interceptor terminates Rtcp {:?}", msg.transport.peer_addr);
8082
return;
8183
}
8284
}
8385

86+
debug!("interceptor read bypass {:?}", msg.transport.peer_addr);
8487
ctx.fire_read(msg);
8588
}
8689

@@ -197,6 +200,9 @@ impl Handler for InterceptorHandler {
197200
}
198201
};
199202
}
203+
204+
debug!("interceptor write {:?}", msg.transport.peer_addr);
205+
self.transmits.push_back(msg);
200206
}
201207

202208
self.transmits.pop_front()

src/handler/stun.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ impl Handler for StunHandler {
5858
}
5959
}
6060
} else {
61-
debug!("bypass StunInbound for {}", msg.transport.peer_addr);
61+
debug!("bypass StunHandler read for {}", msg.transport.peer_addr);
6262
ctx.fire_read(msg);
6363
}
6464
}
@@ -81,7 +81,7 @@ impl Handler for StunHandler {
8181
message: MessageEvent::Stun(STUNMessageEvent::Raw(message)),
8282
})
8383
} else {
84-
debug!("bypass StunOutbound for {}", msg.transport.peer_addr);
84+
debug!("bypass StunHandler write for {}", msg.transport.peer_addr);
8585
Some(msg)
8686
}
8787
} else {

0 commit comments

Comments
 (0)