Skip to content

Commit be438ed

Browse files
authored
Fix SCTP send_init/handle_init race (#615)
* sctp: Don't derive Default for Stream and AssociationInternal These structures should never be created using defaults. Which also makes channel senders mandatory (not Optional). * sctp: Simplify association creation * sctp: Log error for unexpected INIT * sctp: Hold association lock when sending client init This fixes a race where read_loop may process incoming init and going to established before send_init() is called.
1 parent c7cfe3c commit be438ed

File tree

6 files changed

+206
-187
lines changed

6 files changed

+206
-187
lines changed

data/src/data_channel/mod.rs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ pub struct Config {
3737
}
3838

3939
/// DataChannel represents a data channel
40-
#[derive(Debug, Default, Clone)]
40+
#[derive(Debug, Clone)]
4141
pub struct DataChannel {
4242
pub config: Config,
4343
stream: Arc<Stream>,
@@ -54,7 +54,11 @@ impl DataChannel {
5454
Self {
5555
config,
5656
stream,
57-
..Default::default()
57+
58+
messages_sent: Arc::new(AtomicUsize::default()),
59+
messages_received: Arc::new(AtomicUsize::default()),
60+
bytes_sent: Arc::new(AtomicUsize::default()),
61+
bytes_received: Arc::new(AtomicUsize::default()),
5862
}
5963
}
6064

@@ -404,17 +408,6 @@ pub struct PollDataChannel {
404408

405409
impl PollDataChannel {
406410
/// Constructs a new `PollDataChannel`.
407-
///
408-
/// # Examples
409-
///
410-
/// ```
411-
/// use webrtc_data::data_channel::{DataChannel, PollDataChannel, Config};
412-
/// use sctp::stream::Stream;
413-
/// use std::sync::Arc;
414-
///
415-
/// let dc = Arc::new(DataChannel::new(Arc::new(Stream::default()), Config::default()));
416-
/// let poll_dc = PollDataChannel::new(dc);
417-
/// ```
418411
pub fn new(data_channel: Arc<DataChannel>) -> Self {
419412
Self {
420413
data_channel,

sctp/src/association/association_internal.rs

Lines changed: 92 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -9,14 +9,13 @@ use crate::param::param_forward_tsn_supported::ParamForwardTsnSupported;
99
use crate::param::param_type::ParamType;
1010
use crate::param::param_unrecognized::ParamUnrecognized;
1111

12-
#[derive(Default)]
1312
pub struct AssociationInternal {
1413
pub(crate) name: String,
1514
pub(crate) state: Arc<AtomicU8>,
1615
pub(crate) max_message_size: Arc<AtomicU32>,
1716
pub(crate) inflight_queue_length: Arc<AtomicUsize>,
1817
pub(crate) will_send_shutdown: Arc<AtomicBool>,
19-
awake_write_loop_ch: Option<Arc<mpsc::Sender<()>>>,
18+
awake_write_loop_ch: Arc<mpsc::Sender<()>>,
2019

2120
peer_verification_tag: u32,
2221
pub(crate) my_verification_tag: u32,
@@ -77,11 +76,8 @@ pub struct AssociationInternal {
7776
streams: HashMap<u16, Arc<Stream>>,
7877

7978
close_loop_ch_tx: Option<broadcast::Sender<()>>,
80-
accept_ch_tx: Option<mpsc::Sender<Arc<Stream>>>,
81-
handshake_completed_ch_tx: Option<mpsc::Sender<Option<Error>>>,
82-
83-
// local error
84-
silent_error: Option<Error>,
79+
accept_ch_tx: mpsc::Sender<Arc<Stream>>,
80+
handshake_completed_ch_tx: mpsc::Sender<Option<Error>>,
8581

8682
// per inbound packet context
8783
delayed_ack_triggered: bool,
@@ -118,58 +114,94 @@ impl AssociationInternal {
118114
if tsn == 0 {
119115
tsn += 1;
120116
}
121-
let mut a = AssociationInternal {
117+
118+
let mtu = INITIAL_MTU;
119+
// RFC 4690 Sec 7.2.1
120+
// o The initial cwnd before DATA transmission or after a sufficiently
121+
// long idle period MUST be set to min(4*MTU, max (2*MTU, 4380
122+
// bytes)).
123+
// TODO: Consider whether this should use `clamp`
124+
#[allow(clippy::manual_clamp)]
125+
let cwnd = std::cmp::min(4 * mtu, std::cmp::max(2 * mtu, 4380));
126+
127+
let ret = AssociationInternal {
122128
name: config.name,
123-
max_receive_buffer_size,
129+
state: Arc::new(AtomicU8::new(AssociationState::Closed as u8)),
124130
max_message_size: Arc::new(AtomicU32::new(max_message_size)),
125131

126-
my_max_num_outbound_streams: u16::MAX,
132+
will_send_shutdown: Arc::new(AtomicBool::default()),
133+
awake_write_loop_ch,
134+
peer_verification_tag: 0,
135+
my_verification_tag: random::<u32>(),
136+
137+
my_next_tsn: tsn,
138+
peer_last_tsn: 0,
139+
min_tsn2measure_rtt: tsn,
140+
will_send_forward_tsn: false,
141+
will_retransmit_fast: false,
142+
will_retransmit_reconfig: false,
143+
will_send_shutdown_ack: false,
144+
will_send_shutdown_complete: false,
145+
146+
my_next_rsn: tsn,
147+
reconfigs: HashMap::new(),
148+
reconfig_requests: HashMap::new(),
149+
150+
source_port: 0,
151+
destination_port: 0,
127152
my_max_num_inbound_streams: u16::MAX,
153+
my_max_num_outbound_streams: u16::MAX,
154+
my_cookie: None,
128155
payload_queue: PayloadQueue::new(Arc::new(AtomicUsize::new(0))),
129156
inflight_queue: PayloadQueue::new(Arc::clone(&inflight_queue_length)),
130157
inflight_queue_length,
131158
pending_queue: Arc::new(PendingQueue::new()),
132159
control_queue: ControlQueue::new(),
133-
mtu: INITIAL_MTU,
134-
max_payload_size: INITIAL_MTU - (COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE),
135-
my_verification_tag: random::<u32>(),
136-
my_next_tsn: tsn,
137-
my_next_rsn: tsn,
138-
min_tsn2measure_rtt: tsn,
139-
state: Arc::new(AtomicU8::new(AssociationState::Closed as u8)),
160+
mtu,
161+
max_payload_size: mtu - (COMMON_HEADER_SIZE + DATA_CHUNK_HEADER_SIZE),
162+
cumulative_tsn_ack_point: tsn - 1,
163+
advanced_peer_tsn_ack_point: tsn - 1,
164+
use_forward_tsn: false,
165+
166+
max_receive_buffer_size,
167+
cwnd,
168+
rwnd: 0,
169+
ssthresh: 0,
170+
partial_bytes_acked: 0,
171+
in_fast_recovery: false,
172+
fast_recover_exit_point: 0,
173+
140174
rto_mgr: RtoManager::new(),
175+
t1init: None,
176+
t1cookie: None,
177+
t2shutdown: None,
178+
t3rtx: None,
179+
treconfig: None,
180+
ack_timer: None,
181+
182+
stored_init: None,
183+
stored_cookie_echo: None,
141184
streams: HashMap::new(),
142-
reconfigs: HashMap::new(),
143-
reconfig_requests: HashMap::new(),
144-
accept_ch_tx: Some(accept_ch_tx),
145185
close_loop_ch_tx: Some(close_loop_ch_tx),
146-
handshake_completed_ch_tx: Some(handshake_completed_ch_tx),
147-
cumulative_tsn_ack_point: tsn - 1,
148-
advanced_peer_tsn_ack_point: tsn - 1,
149-
silent_error: Some(Error::ErrSilentlyDiscard),
186+
accept_ch_tx,
187+
handshake_completed_ch_tx,
188+
189+
delayed_ack_triggered: false,
190+
immediate_ack_triggered: false,
150191
stats: Arc::new(AssociationStats::default()),
151-
awake_write_loop_ch: Some(awake_write_loop_ch),
152-
..Default::default()
192+
ack_state: AckState::default(),
193+
ack_mode: AckMode::default(),
153194
};
154195

155-
// RFC 4690 Sec 7.2.1
156-
// o The initial cwnd before DATA transmission or after a sufficiently
157-
// long idle period MUST be set to min(4*MTU, max (2*MTU, 4380
158-
// bytes)).
159-
// TODO: Consider whether this should use `clamp`
160-
#[allow(clippy::manual_clamp)]
161-
{
162-
a.cwnd = std::cmp::min(4 * a.mtu, std::cmp::max(2 * a.mtu, 4380));
163-
}
164196
log::trace!(
165197
"[{}] updated cwnd={} ssthresh={} inflight={} (INI)",
166-
a.name,
167-
a.cwnd,
168-
a.ssthresh,
169-
a.inflight_queue.get_num_bytes()
198+
ret.name,
199+
ret.cwnd,
200+
ret.ssthresh,
201+
ret.inflight_queue.get_num_bytes()
170202
);
171203

172-
a
204+
ret
173205
}
174206

175207
/// caller must hold self.lock
@@ -291,9 +323,7 @@ impl AssociationInternal {
291323

292324
fn awake_write_loop(&self) {
293325
//log::debug!("[{}] awake_write_loop_ch.notify_one", self.name);
294-
if let Some(awake_write_loop_ch) = &self.awake_write_loop_ch {
295-
let _ = awake_write_loop_ch.try_send(());
296-
}
326+
let _ = self.awake_write_loop_ch.try_send(());
297327
}
298328

299329
/// unregister_stream un-registers a stream from the association
@@ -606,7 +636,6 @@ impl AssociationInternal {
606636

607637
async fn handle_init(&mut self, p: &Packet, i: &ChunkInit) -> Result<Vec<Packet>> {
608638
let state = self.get_state();
609-
log::debug!("[{}] chunkInit received in state '{}'", self.name, state);
610639

611640
// https://tools.ietf.org/html/rfc4960#section-5.2.1
612641
// Upon receipt of an INIT in the COOKIE-WAIT state, an endpoint MUST
@@ -619,11 +648,14 @@ impl AssociationInternal {
619648
&& state != AssociationState::CookieWait
620649
&& state != AssociationState::CookieEchoed
621650
{
651+
log::error!("[{}] chunkInit received in state '{}'", self.name, state);
622652
// 5.2.2. Unexpected INIT in States Other than CLOSED, COOKIE-ECHOED,
623653
// COOKIE-WAIT, and SHUTDOWN-ACK-SENT
624654
return Err(Error::ErrHandleInitState);
625655
}
626656

657+
log::debug!("[{}] chunkInit received in state '{}'", self.name, state);
658+
627659
// Should we be setting any of these permanently until we've ACKed further?
628660
self.my_max_num_inbound_streams =
629661
std::cmp::min(i.num_inbound_streams, self.my_max_num_inbound_streams);
@@ -855,9 +887,7 @@ impl AssociationInternal {
855887
self.stored_cookie_echo = None;
856888

857889
self.set_state(AssociationState::Established);
858-
if let Some(handshake_completed_ch) = &self.handshake_completed_ch_tx {
859-
let _ = handshake_completed_ch.send(None).await;
860-
}
890+
let _ = self.handshake_completed_ch_tx.send(None).await;
861891
}
862892
_ => return Ok(vec![]),
863893
};
@@ -891,9 +921,7 @@ impl AssociationInternal {
891921
self.stored_cookie_echo = None;
892922

893923
self.set_state(AssociationState::Established);
894-
if let Some(handshake_completed_ch) = &self.handshake_completed_ch_tx {
895-
let _ = handshake_completed_ch.send(None).await;
896-
}
924+
let _ = self.handshake_completed_ch_tx.send(None).await;
897925

898926
Ok(vec![])
899927
}
@@ -1049,22 +1077,14 @@ impl AssociationInternal {
10491077
));
10501078

10511079
if accept {
1052-
if let Some(accept_ch) = &self.accept_ch_tx {
1053-
if accept_ch.try_send(Arc::clone(&s)).is_ok() {
1054-
log::debug!(
1055-
"[{}] accepted a new stream (streamIdentifier: {})",
1056-
self.name,
1057-
stream_identifier
1058-
);
1059-
} else {
1060-
log::debug!("[{}] dropped a new stream due to accept_ch full", self.name);
1061-
return None;
1062-
}
1063-
} else {
1080+
if self.accept_ch_tx.try_send(Arc::clone(&s)).is_ok() {
10641081
log::debug!(
1065-
"[{}] dropped a new stream due to accept_ch_tx is None",
1066-
self.name
1082+
"[{}] accepted a new stream (streamIdentifier: {})",
1083+
self.name,
1084+
stream_identifier
10671085
);
1086+
} else {
1087+
log::debug!("[{}] dropped a new stream due to accept_ch full", self.name);
10681088
return None;
10691089
}
10701090
}
@@ -2389,19 +2409,17 @@ impl RtxTimerObserver for AssociationInternal {
23892409
match id {
23902410
RtxTimerId::T1Init => {
23912411
log::error!("[{}] retransmission failure: T1-init", self.name);
2392-
if let Some(handshake_completed_ch) = &self.handshake_completed_ch_tx {
2393-
let _ = handshake_completed_ch
2394-
.send(Some(Error::ErrHandshakeInitAck))
2395-
.await;
2396-
}
2412+
let _ = self
2413+
.handshake_completed_ch_tx
2414+
.send(Some(Error::ErrHandshakeInitAck))
2415+
.await;
23972416
}
23982417
RtxTimerId::T1Cookie => {
23992418
log::error!("[{}] retransmission failure: T1-cookie", self.name);
2400-
if let Some(handshake_completed_ch) = &self.handshake_completed_ch_tx {
2401-
let _ = handshake_completed_ch
2402-
.send(Some(Error::ErrHandshakeCookieEcho))
2403-
.await;
2404-
}
2419+
let _ = self
2420+
.handshake_completed_ch_tx
2421+
.send(Some(Error::ErrHandshakeCookieEcho))
2422+
.await;
24052423
}
24062424

24072425
RtxTimerId::T2Shutdown => {

0 commit comments

Comments
 (0)