Skip to content

Commit c4f4031

Browse files
authored
Fix dtls handshake potential hanging issue (#631)
- The logic of handshake_tx is copied from Pion::dtls. However, tokio:: mpsc::channel with size 1 is used here, and it behaves differently from the golang::channel. The golang channel is actually a rendezvous channel (size=0 channel). It means that the sender blocks until the receiver receives, then both of them can proceed. However, the tokio::mpsc::sender can proceed as long as there is capacity in the channel. Channel with size=1 is not a rendezvous channel and tokio::mpsc::channel doesn't support size=0. therefore, I added an async block where oneshot channel is integrated in order to mimic the rendezvous channel behavior.
1 parent 78aa8f5 commit c4f4031

File tree

2 files changed

+45
-38
lines changed

2 files changed

+45
-38
lines changed

dtls/src/conn/mod.rs

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::sync::Arc;
1010
use async_trait::async_trait;
1111
use log::*;
1212
use portable_atomic::{AtomicBool, AtomicU16};
13-
use tokio::sync::{mpsc, Mutex};
13+
use tokio::sync::{mpsc, oneshot, Mutex};
1414
use tokio::time::Duration;
1515
use util::replay_detector::*;
1616
use util::Conn;
@@ -64,7 +64,8 @@ struct ConnReaderContext {
6464
cache: HandshakeCache,
6565
cipher_suite: Arc<Mutex<Option<Box<dyn CipherSuite + Send + Sync>>>>,
6666
remote_epoch: Arc<AtomicU16>,
67-
handshake_tx: mpsc::Sender<mpsc::Sender<()>>,
67+
// use additional oneshot sender to mimic rendezvous channel behavior
68+
handshake_tx: mpsc::Sender<(oneshot::Sender<()>, mpsc::Sender<()>)>,
6869
handshake_done_rx: mpsc::Receiver<()>,
6970
packet_tx: Arc<mpsc::Sender<PacketSendRequest>>,
7071
}
@@ -96,7 +97,8 @@ pub struct DTLSConn {
9697
pub(crate) flights: Option<Vec<Packet>>,
9798
pub(crate) cfg: HandshakeConfig,
9899
pub(crate) retransmit: bool,
99-
pub(crate) handshake_rx: mpsc::Receiver<mpsc::Sender<()>>,
100+
// use additional oneshot sender to mimic rendezvous channel behavior
101+
pub(crate) handshake_rx: mpsc::Receiver<(oneshot::Sender<()>, mpsc::Sender<()>)>,
100102

101103
pub(crate) packet_tx: Arc<mpsc::Sender<PacketSendRequest>>,
102104
pub(crate) handle_queue_tx: mpsc::Sender<mpsc::Sender<()>>,
@@ -830,9 +832,13 @@ impl DTLSConn {
830832

831833
if has_handshake {
832834
let (done_tx, mut done_rx) = mpsc::channel(1);
833-
835+
let rendezvous_at_handshake = async {
836+
let (rendezvous_tx, rendezvous_rx) = oneshot::channel();
837+
_ = ctx.handshake_tx.send((rendezvous_tx, done_tx)).await;
838+
rendezvous_rx.await
839+
};
834840
tokio::select! {
835-
_ = ctx.handshake_tx.send(done_tx) => {
841+
_ = rendezvous_at_handshake => {
836842
let mut wait_done_rx = true;
837843
while wait_done_rx{
838844
tokio::select!{

dtls/src/handshaker.rs

Lines changed: 34 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -330,45 +330,46 @@ impl DTLSConn {
330330

331331
loop {
332332
tokio::select! {
333-
done = self.handshake_rx.recv() =>{
334-
if done.is_none() {
333+
done_senders = self.handshake_rx.recv() =>{
334+
if done_senders.is_none() {
335335
trace!("[handshake:{}] {} handshake_tx is dropped", srv_cli_str(self.state.is_client), self.current_flight.to_string());
336336
return Err(Error::ErrAlertFatalOrClose);
337-
}
338-
339-
//trace!("[handshake:{}] {} received handshake_rx", srv_cli_str(self.state.is_client), self.current_flight.to_string());
340-
let result = self.current_flight.parse(&mut self.handle_queue_tx, &mut self.state, &self.cache, &self.cfg).await;
341-
drop(done);
342-
match result {
343-
Err((alert, mut err)) => {
344-
trace!("[handshake:{}] {} result alert:{:?}, err:{:?}",
345-
srv_cli_str(self.state.is_client),
346-
self.current_flight.to_string(),
347-
alert,
348-
err);
349-
350-
if let Some(alert) = alert {
351-
let alert_err = self.notify(alert.alert_level, alert.alert_description).await;
352-
353-
if let Err(alert_err) = alert_err {
354-
if err.is_some() {
355-
err = Some(alert_err);
337+
} else if let Some((rendezvous_tx, done_tx)) = done_senders {
338+
rendezvous_tx.send(()).ok();
339+
//trace!("[handshake:{}] {} received handshake_rx", srv_cli_str(self.state.is_client), self.current_flight.to_string());
340+
let result = self.current_flight.parse(&mut self.handle_queue_tx, &mut self.state, &self.cache, &self.cfg).await;
341+
drop(done_tx);
342+
match result {
343+
Err((alert, mut err)) => {
344+
trace!("[handshake:{}] {} result alert:{:?}, err:{:?}",
345+
srv_cli_str(self.state.is_client),
346+
self.current_flight.to_string(),
347+
alert,
348+
err);
349+
350+
if let Some(alert) = alert {
351+
let alert_err = self.notify(alert.alert_level, alert.alert_description).await;
352+
353+
if let Err(alert_err) = alert_err {
354+
if err.is_some() {
355+
err = Some(alert_err);
356+
}
356357
}
357358
}
359+
if let Some(err) = err {
360+
return Err(err);
361+
}
358362
}
359-
if let Some(err) = err {
360-
return Err(err);
361-
}
362-
}
363-
Ok(next_flight) => {
364-
trace!("[handshake:{}] {} -> {}", srv_cli_str(self.state.is_client), self.current_flight.to_string(), next_flight.to_string());
365-
if next_flight.is_last_recv_flight() && self.current_flight.to_string() == next_flight.to_string() {
366-
return Ok(HandshakeState::Finished);
363+
Ok(next_flight) => {
364+
trace!("[handshake:{}] {} -> {}", srv_cli_str(self.state.is_client), self.current_flight.to_string(), next_flight.to_string());
365+
if next_flight.is_last_recv_flight() && self.current_flight.to_string() == next_flight.to_string() {
366+
return Ok(HandshakeState::Finished);
367+
}
368+
self.current_flight = next_flight;
369+
return Ok(HandshakeState::Preparing);
367370
}
368-
self.current_flight = next_flight;
369-
return Ok(HandshakeState::Preparing);
370-
}
371-
};
371+
};
372+
}
372373
}
373374

374375
_ = retransmit_timer.as_mut() =>{

0 commit comments

Comments
 (0)