Skip to content

Commit cf3b7a4

Browse files
authored
Fix turn relay client fd leak (#514)
* fix: missing close relay conn when happen error * fix: make sure that the listening loop must exit when the client closes
1 parent 352aca1 commit cf3b7a4

File tree

4 files changed

+50
-23
lines changed

4 files changed

+50
-23
lines changed

Cargo.lock

Lines changed: 3 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

ice/src/agent/agent_gather.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -820,8 +820,8 @@ impl Agent {
820820
return Ok(());
821821
}
822822

823-
let relay_conn = match client.allocate().await {
824-
Ok(conn) => conn,
823+
let relay_conn: Arc<dyn Conn + Send + Sync> = match client.allocate().await {
824+
Ok(conn) => Arc::new(conn),
825825
Err(err) => {
826826
let _ = client.close().await;
827827
log::warn!(
@@ -841,7 +841,7 @@ impl Agent {
841841
address: raddr.ip().to_string(),
842842
port: raddr.port(),
843843
component: COMPONENT_RTP,
844-
conn: Some(Arc::new(relay_conn)),
844+
conn: Some(Arc::clone(&relay_conn)),
845845
..CandidateBaseConfig::default()
846846
},
847847
rel_addr,
@@ -853,6 +853,7 @@ impl Agent {
853853
match relay_config.new_candidate_relay() {
854854
Ok(candidate) => Arc::new(candidate),
855855
Err(err) => {
856+
let _ = relay_conn.close().await;
856857
let _ = client.close().await;
857858
log::warn!(
858859
"[{}]: Failed to create relay candidate: {} {}: {}",

turn/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ util = { version = "0.8", path = "../util", package = "webrtc-util", default-fea
1414
stun = { version = "0.5", path = "../stun" }
1515

1616
tokio = { version = "1.32.0", features = ["full"] }
17+
tokio-util = "0.7"
1718
futures = "0.3"
1819
async-trait = "0.1"
1920
log = "0.4"

turn/src/client/mod.rs

Lines changed: 42 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,10 @@ use stun::integrity::*;
2424
use stun::message::*;
2525
use stun::textattrs::*;
2626
use stun::xoraddr::*;
27+
use tokio::pin;
28+
use tokio::select;
2729
use tokio::sync::{mpsc, Mutex};
30+
use tokio_util::sync::CancellationToken;
2831
use transaction::*;
2932
use util::conn::*;
3033
use util::vnet::net::*;
@@ -78,6 +81,7 @@ struct ClientInternal {
7881
binding_mgr: Arc<Mutex<BindingManager>>,
7982
rto_in_ms: u16,
8083
read_ch_tx: Arc<Mutex<Option<mpsc::Sender<InboundData>>>>,
84+
close_notify: CancellationToken,
8185
}
8286

8387
#[async_trait]
@@ -210,6 +214,7 @@ impl ClientInternal {
210214
},
211215
integrity: MessageIntegrity::new_short_term_integrity(String::new()),
212216
read_ch_tx: Arc::new(Mutex::new(None)),
217+
close_notify: CancellationToken::new(),
213218
})
214219
}
215220

@@ -227,33 +232,51 @@ impl ClientInternal {
227232
let tr_map = Arc::clone(&self.tr_map);
228233
let read_ch_tx = Arc::clone(&self.read_ch_tx);
229234
let binding_mgr = Arc::clone(&self.binding_mgr);
235+
let close_notify = self.close_notify.clone();
230236

231237
tokio::spawn(async move {
232238
let mut buf = vec![0u8; MAX_DATA_BUFFER_SIZE];
239+
let wait_cancel = close_notify.cancelled();
240+
pin!(wait_cancel);
241+
233242
loop {
234-
//TODO: gracefully exit loop
235-
let (n, from) = match conn.recv_from(&mut buf).await {
236-
Ok((n, from)) => (n, from),
237-
Err(err) => {
238-
log::debug!("exiting read loop: {}", err);
243+
let (n, from) = select! {
244+
biased;
245+
246+
_ = &mut wait_cancel => {
247+
log::debug!("exiting read loop");
239248
break;
249+
},
250+
result = conn.recv_from(&mut buf) => match result {
251+
Ok((n, from)) => (n, from),
252+
Err(err) => {
253+
log::debug!("exiting read loop: {}", err);
254+
break;
255+
}
240256
}
241257
};
242-
243258
log::debug!("received {} bytes of udp from {}", n, from);
244259

245-
if let Err(err) = ClientInternal::handle_inbound(
246-
&read_ch_tx,
247-
&buf[..n],
248-
from,
249-
&stun_serv_str,
250-
&tr_map,
251-
&binding_mgr,
252-
)
253-
.await
254-
{
255-
log::debug!("exiting read loop: {}", err);
256-
break;
260+
select! {
261+
biased;
262+
263+
_ = &mut wait_cancel => {
264+
log::debug!("exiting read loop");
265+
break;
266+
},
267+
result = ClientInternal::handle_inbound(
268+
&read_ch_tx,
269+
&buf[..n],
270+
from,
271+
&stun_serv_str,
272+
&tr_map,
273+
&binding_mgr,
274+
) => {
275+
if let Err(err) = result {
276+
log::debug!("exiting read loop: {}", err);
277+
break;
278+
}
279+
}
257280
}
258281
}
259282
});
@@ -430,6 +453,7 @@ impl ClientInternal {
430453

431454
/// Closes this client.
432455
async fn close(&mut self) {
456+
self.close_notify.cancel();
433457
{
434458
let mut read_ch_tx = self.read_ch_tx.lock().await;
435459
read_ch_tx.take();

0 commit comments

Comments
 (0)