Skip to content

Commit 54b5843

Browse files
authored
peer_connection: Implement state changes after spec and fix race (#598)
* peer_connection: Implement state changes after spec https://www.w3.org/TR/webrtc/#rtcpeerconnectionstate-enum Ported from PION pion/webrtc#2435 * peer_connection: Fix state transition race DTLS transport state was previously acquired and sent over to the async context returned. This would in some rare cases obviously lead to updating peer connection state with an old DTLS transport state. This would in turn lead to PeerConnection not updating it's state correctly. Sometimes we would see PeerConnection never reaching Connected. * peer_connection: Initialize PeerConnectionInternal correctly PeerConnectionInternal was initialized with default transports and then mutated right after. This resulted in create_ice_transport() using the default DTLS transport instance which again led to state transition reading DTLS transport state from the wrong instance.
1 parent fa42cad commit 54b5843

File tree

3 files changed

+197
-87
lines changed

3 files changed

+197
-87
lines changed

webrtc/src/peer_connection/mod.rs

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -884,14 +884,21 @@ impl RTCPeerConnection {
884884
// Any of the RTCIceTransports or RTCDtlsTransports are in the "disconnected"
885885
// state and none of them are in the "failed" or "connecting" or "checking" state.
886886
RTCPeerConnectionState::Disconnected
887-
} else if ice_connection_state == RTCIceConnectionState::Connected && dtls_transport_state == RTCDtlsTransportState::Connected {
887+
} else if (ice_connection_state == RTCIceConnectionState::New || ice_connection_state == RTCIceConnectionState::Closed) &&
888+
(dtls_transport_state == RTCDtlsTransportState::New || dtls_transport_state == RTCDtlsTransportState::Closed) {
889+
// None of the previous states apply and all RTCIceTransports are in the "new" or "closed" state,
890+
// and all RTCDtlsTransports are in the "new" or "closed" state, or there are no transports.
891+
RTCPeerConnectionState::New
892+
} else if (ice_connection_state == RTCIceConnectionState::New || ice_connection_state == RTCIceConnectionState::Checking) ||
893+
(dtls_transport_state == RTCDtlsTransportState::New || dtls_transport_state == RTCDtlsTransportState::Connecting) {
894+
// None of the previous states apply and any RTCIceTransport is in the "new" or "checking" state or
895+
// any RTCDtlsTransport is in the "new" or "connecting" state.
896+
RTCPeerConnectionState::Connecting
897+
} else if (ice_connection_state == RTCIceConnectionState::Connected || ice_connection_state == RTCIceConnectionState::Completed || ice_connection_state == RTCIceConnectionState::Closed) &&
898+
(dtls_transport_state == RTCDtlsTransportState::Connected || dtls_transport_state == RTCDtlsTransportState::Closed) {
888899
// All RTCIceTransports and RTCDtlsTransports are in the "connected", "completed" or "closed"
889-
// state and at least one of them is in the "connected" or "completed" state.
900+
// state and all RTCDtlsTransports are in the "connected" or "closed" state.
890901
RTCPeerConnectionState::Connected
891-
} else if ice_connection_state == RTCIceConnectionState::Checking && dtls_transport_state == RTCDtlsTransportState::Connecting {
892-
// Any of the RTCIceTransports or RTCDtlsTransports are in the "connecting" or
893-
// "checking" state and none of them is in the "failed" state.
894-
RTCPeerConnectionState::Connecting
895902
} else {
896903
RTCPeerConnectionState::New
897904
};

webrtc/src/peer_connection/peer_connection_internal.rs

Lines changed: 79 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -77,7 +77,24 @@ impl PeerConnectionInternal {
7777
stats_interceptor: Arc<stats::StatsInterceptor>,
7878
mut configuration: RTCConfiguration,
7979
) -> Result<(Arc<Self>, RTCConfiguration)> {
80-
let mut pc = PeerConnectionInternal {
80+
// Create the ice gatherer
81+
let ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
82+
ice_servers: configuration.get_ice_servers(),
83+
ice_gather_policy: configuration.ice_transport_policy,
84+
})?);
85+
86+
// Create the ICE transport
87+
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&ice_gatherer)));
88+
89+
// Create the DTLS transport
90+
let certificates = configuration.certificates.drain(..).collect();
91+
let dtls_transport =
92+
Arc::new(api.new_dtls_transport(Arc::clone(&ice_transport), certificates)?);
93+
94+
// Create the SCTP transport
95+
let sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&dtls_transport))?);
96+
97+
let pc = Arc::new(PeerConnectionInternal {
8198
greater_mid: AtomicIsize::new(-1),
8299
sdp_origin: Mutex::new(Default::default()),
83100
last_offer: Mutex::new("".to_owned()),
@@ -89,16 +106,16 @@ impl PeerConnectionInternal {
89106
is_negotiation_needed: Arc::new(AtomicBool::new(false)),
90107
negotiation_needed_state: Arc::new(AtomicU8::new(NegotiationNeededState::Empty as u8)),
91108
signaling_state: Arc::new(AtomicU8::new(RTCSignalingState::Stable as u8)),
92-
ice_transport: Arc::new(Default::default()),
93-
dtls_transport: Arc::new(Default::default()),
109+
ice_transport,
110+
dtls_transport,
94111
ice_connection_state: Arc::new(AtomicU8::new(RTCIceConnectionState::New as u8)),
95-
sctp_transport: Arc::new(Default::default()),
112+
sctp_transport,
96113
rtp_transceivers: Arc::new(Default::default()),
97114
on_track_handler: Arc::new(ArcSwapOption::empty()),
98115
on_signaling_state_change_handler: ArcSwapOption::empty(),
99116
on_ice_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
100117
on_data_channel_handler: Arc::new(Default::default()),
101-
ice_gatherer: Arc::new(Default::default()),
118+
ice_gatherer,
102119
current_local_description: Arc::new(Default::default()),
103120
current_remote_description: Arc::new(Default::default()),
104121
pending_local_description: Arc::new(Default::default()),
@@ -114,39 +131,77 @@ impl PeerConnectionInternal {
114131
stats_interceptor,
115132
on_peer_connection_state_change_handler: Arc::new(ArcSwapOption::empty()),
116133
pending_remote_description: Arc::new(Default::default()),
117-
};
118-
119-
// Create the ice gatherer
120-
pc.ice_gatherer = Arc::new(api.new_ice_gatherer(RTCIceGatherOptions {
121-
ice_servers: configuration.get_ice_servers(),
122-
ice_gather_policy: configuration.ice_transport_policy,
123-
})?);
124-
125-
// Create the ice transport
126-
pc.ice_transport = pc.create_ice_transport(api).await;
134+
});
127135

128-
// Create the DTLS transport
129-
let certificates = configuration.certificates.drain(..).collect();
130-
pc.dtls_transport =
131-
Arc::new(api.new_dtls_transport(Arc::clone(&pc.ice_transport), certificates)?);
136+
// Wire up the ice transport connection state change handler
137+
let ice_connection_state = Arc::clone(&pc.ice_connection_state);
138+
let peer_connection_state = Arc::clone(&pc.peer_connection_state);
139+
let is_closed = Arc::clone(&pc.is_closed);
140+
let dtls_transport = Arc::clone(&pc.dtls_transport);
141+
let on_ice_connection_state_change_handler =
142+
Arc::clone(&pc.on_ice_connection_state_change_handler);
143+
let on_peer_connection_state_change_handler =
144+
Arc::clone(&pc.on_peer_connection_state_change_handler);
145+
146+
pc.ice_transport.on_connection_state_change(Box::new(
147+
move |state: RTCIceTransportState| {
148+
let cs = match state {
149+
RTCIceTransportState::New => RTCIceConnectionState::New,
150+
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
151+
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
152+
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
153+
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
154+
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
155+
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
156+
_ => {
157+
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
158+
return Box::pin(async {});
159+
}
160+
};
132161

133-
// Create the SCTP transport
134-
pc.sctp_transport = Arc::new(api.new_sctp_transport(Arc::clone(&pc.dtls_transport))?);
162+
let dtls_transport = Arc::clone(&dtls_transport);
163+
let ice_connection_state = Arc::clone(&ice_connection_state);
164+
let on_ice_connection_state_change_handler =
165+
Arc::clone(&on_ice_connection_state_change_handler);
166+
let on_peer_connection_state_change_handler =
167+
Arc::clone(&on_peer_connection_state_change_handler);
168+
let is_closed = Arc::clone(&is_closed);
169+
let peer_connection_state = Arc::clone(&peer_connection_state);
170+
Box::pin(async move {
171+
RTCPeerConnection::do_ice_connection_state_change(
172+
&on_ice_connection_state_change_handler,
173+
&ice_connection_state,
174+
cs,
175+
)
176+
.await;
177+
178+
let dtls_transport_state = dtls_transport.state();
179+
RTCPeerConnection::update_connection_state(
180+
&on_peer_connection_state_change_handler,
181+
&is_closed,
182+
&peer_connection_state,
183+
cs,
184+
dtls_transport_state,
185+
)
186+
.await;
187+
})
188+
},
189+
));
135190

136191
// Wire up the on datachannel handler
137192
let on_data_channel_handler = Arc::clone(&pc.on_data_channel_handler);
138193
pc.sctp_transport
139194
.on_data_channel(Box::new(move |d: Arc<RTCDataChannel>| {
140-
let on_data_channel_handler2 = Arc::clone(&on_data_channel_handler);
195+
let on_data_channel_handler = Arc::clone(&on_data_channel_handler);
141196
Box::pin(async move {
142-
if let Some(handler) = &*on_data_channel_handler2.load() {
197+
if let Some(handler) = &*on_data_channel_handler.load() {
143198
let mut f = handler.lock().await;
144199
f(d).await;
145200
}
146201
})
147202
}));
148203

149-
Ok((Arc::new(pc), configuration))
204+
Ok((pc, configuration))
150205
}
151206

152207
pub(super) async fn start_rtp(
@@ -1141,63 +1196,6 @@ impl PeerConnectionInternal {
11411196
}
11421197
}
11431198

1144-
pub(super) async fn create_ice_transport(&self, api: &API) -> Arc<RTCIceTransport> {
1145-
let ice_transport = Arc::new(api.new_ice_transport(Arc::clone(&self.ice_gatherer)));
1146-
1147-
let ice_connection_state = Arc::clone(&self.ice_connection_state);
1148-
let peer_connection_state = Arc::clone(&self.peer_connection_state);
1149-
let is_closed = Arc::clone(&self.is_closed);
1150-
let dtls_transport = Arc::clone(&self.dtls_transport);
1151-
let on_ice_connection_state_change_handler =
1152-
Arc::clone(&self.on_ice_connection_state_change_handler);
1153-
let on_peer_connection_state_change_handler =
1154-
Arc::clone(&self.on_peer_connection_state_change_handler);
1155-
1156-
ice_transport.on_connection_state_change(Box::new(move |state: RTCIceTransportState| {
1157-
let cs = match state {
1158-
RTCIceTransportState::New => RTCIceConnectionState::New,
1159-
RTCIceTransportState::Checking => RTCIceConnectionState::Checking,
1160-
RTCIceTransportState::Connected => RTCIceConnectionState::Connected,
1161-
RTCIceTransportState::Completed => RTCIceConnectionState::Completed,
1162-
RTCIceTransportState::Failed => RTCIceConnectionState::Failed,
1163-
RTCIceTransportState::Disconnected => RTCIceConnectionState::Disconnected,
1164-
RTCIceTransportState::Closed => RTCIceConnectionState::Closed,
1165-
_ => {
1166-
log::warn!("on_connection_state_change: unhandled ICE state: {}", state);
1167-
return Box::pin(async {});
1168-
}
1169-
};
1170-
1171-
let ice_connection_state2 = Arc::clone(&ice_connection_state);
1172-
let on_ice_connection_state_change_handler2 =
1173-
Arc::clone(&on_ice_connection_state_change_handler);
1174-
let on_peer_connection_state_change_handler2 =
1175-
Arc::clone(&on_peer_connection_state_change_handler);
1176-
let is_closed2 = Arc::clone(&is_closed);
1177-
let dtls_transport_state = dtls_transport.state();
1178-
let peer_connection_state2 = Arc::clone(&peer_connection_state);
1179-
Box::pin(async move {
1180-
RTCPeerConnection::do_ice_connection_state_change(
1181-
&on_ice_connection_state_change_handler2,
1182-
&ice_connection_state2,
1183-
cs,
1184-
)
1185-
.await;
1186-
1187-
RTCPeerConnection::update_connection_state(
1188-
&on_peer_connection_state_change_handler2,
1189-
&is_closed2,
1190-
&peer_connection_state2,
1191-
cs,
1192-
dtls_transport_state,
1193-
)
1194-
.await;
1195-
})
1196-
}));
1197-
1198-
ice_transport
1199-
}
1200-
12011199
/// has_local_description_changed returns whether local media (rtp_transceivers) has changed
12021200
/// caller of this method should hold `pc.mu` lock
12031201
pub(super) async fn has_local_description_changed(&self, desc: &RTCSessionDescription) -> bool {

webrtc/src/peer_connection/peer_connection_test.rs

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -635,3 +635,108 @@ async fn test_peer_connection_simulcast_no_data_channel() -> Result<()> {
635635

636636
Ok(())
637637
}
638+
639+
#[tokio::test]
640+
async fn test_peer_connection_state() -> Result<()> {
641+
let mut m = MediaEngine::default();
642+
m.register_default_codecs()?;
643+
let api = APIBuilder::new().with_media_engine(m).build();
644+
let pc = api.new_peer_connection(RTCConfiguration::default()).await?;
645+
646+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::New);
647+
648+
RTCPeerConnection::update_connection_state(
649+
&pc.internal.on_peer_connection_state_change_handler,
650+
&pc.internal.is_closed,
651+
&pc.internal.peer_connection_state,
652+
RTCIceConnectionState::Checking,
653+
RTCDtlsTransportState::New,
654+
)
655+
.await;
656+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);
657+
658+
RTCPeerConnection::update_connection_state(
659+
&pc.internal.on_peer_connection_state_change_handler,
660+
&pc.internal.is_closed,
661+
&pc.internal.peer_connection_state,
662+
RTCIceConnectionState::Connected,
663+
RTCDtlsTransportState::New,
664+
)
665+
.await;
666+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);
667+
668+
RTCPeerConnection::update_connection_state(
669+
&pc.internal.on_peer_connection_state_change_handler,
670+
&pc.internal.is_closed,
671+
&pc.internal.peer_connection_state,
672+
RTCIceConnectionState::Connected,
673+
RTCDtlsTransportState::Connecting,
674+
)
675+
.await;
676+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connecting);
677+
678+
RTCPeerConnection::update_connection_state(
679+
&pc.internal.on_peer_connection_state_change_handler,
680+
&pc.internal.is_closed,
681+
&pc.internal.peer_connection_state,
682+
RTCIceConnectionState::Connected,
683+
RTCDtlsTransportState::Connected,
684+
)
685+
.await;
686+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);
687+
688+
RTCPeerConnection::update_connection_state(
689+
&pc.internal.on_peer_connection_state_change_handler,
690+
&pc.internal.is_closed,
691+
&pc.internal.peer_connection_state,
692+
RTCIceConnectionState::Completed,
693+
RTCDtlsTransportState::Connected,
694+
)
695+
.await;
696+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);
697+
698+
RTCPeerConnection::update_connection_state(
699+
&pc.internal.on_peer_connection_state_change_handler,
700+
&pc.internal.is_closed,
701+
&pc.internal.peer_connection_state,
702+
RTCIceConnectionState::Connected,
703+
RTCDtlsTransportState::Closed,
704+
)
705+
.await;
706+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Connected);
707+
708+
RTCPeerConnection::update_connection_state(
709+
&pc.internal.on_peer_connection_state_change_handler,
710+
&pc.internal.is_closed,
711+
&pc.internal.peer_connection_state,
712+
RTCIceConnectionState::Disconnected,
713+
RTCDtlsTransportState::Connected,
714+
)
715+
.await;
716+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Disconnected);
717+
718+
RTCPeerConnection::update_connection_state(
719+
&pc.internal.on_peer_connection_state_change_handler,
720+
&pc.internal.is_closed,
721+
&pc.internal.peer_connection_state,
722+
RTCIceConnectionState::Failed,
723+
RTCDtlsTransportState::Connected,
724+
)
725+
.await;
726+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Failed);
727+
728+
RTCPeerConnection::update_connection_state(
729+
&pc.internal.on_peer_connection_state_change_handler,
730+
&pc.internal.is_closed,
731+
&pc.internal.peer_connection_state,
732+
RTCIceConnectionState::Connected,
733+
RTCDtlsTransportState::Failed,
734+
)
735+
.await;
736+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Failed);
737+
738+
pc.close().await?;
739+
assert_eq!(pc.connection_state(), RTCPeerConnectionState::Closed);
740+
741+
Ok(())
742+
}

0 commit comments

Comments
 (0)