diff --git a/Sources/StreamVideo/WebRTC/v2/PeerConnection/StreamRTCPeerConnection.swift b/Sources/StreamVideo/WebRTC/v2/PeerConnection/StreamRTCPeerConnection.swift index c95a298b6..e42cd6821 100644 --- a/Sources/StreamVideo/WebRTC/v2/PeerConnection/StreamRTCPeerConnection.swift +++ b/Sources/StreamVideo/WebRTC/v2/PeerConnection/StreamRTCPeerConnection.swift @@ -29,6 +29,9 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked /// A dispatch queue for handling peer connection operations. let dispatchQueue = DispatchQueue(label: "io.getstream.peerconnection") + + /// A dispatch queue for safely accessing `source`. RTCPeerConnection is not thread-safe. + private let connectionQueue = DispatchQueue(label: "io.getstream.peerconnection.connection") /// A publisher for RTCPeerConnectionEvents. lazy var publisher: AnyPublisher = delegatePublisher @@ -72,7 +75,6 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked /// /// - Parameter sessionDescription: The RTCSessionDescription to set as the local description. /// - Throws: An error if setting the local description fails. - @MainActor func setLocalDescription( _ sessionDescription: RTCSessionDescription ) async throws { @@ -84,11 +86,14 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked return } - source.setLocalDescription(sessionDescription) { error in - if let error = error { - continuation.resume(throwing: error) - } else { - continuation.resume(returning: ()) + connectionQueue.async { [weak self] in + guard let self else { return } + source.setLocalDescription(sessionDescription) { error in + if let error = error { + continuation.resume(throwing: error) + } else { + continuation.resume(returning: ()) + } } } } as () @@ -98,7 +103,6 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked /// /// - Parameter sessionDescription: The RTCSessionDescription to set as the remote description. /// - Throws: An error if setting the remote description fails. - @MainActor func setRemoteDescription( _ sessionDescription: RTCSessionDescription ) async throws { @@ -110,12 +114,15 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked return } - source.setRemoteDescription(sessionDescription) { error in - if let error = error { - continuation.resume(throwing: error) - } else { - self.subject.send(HasRemoteDescription(sessionDescription: sessionDescription)) - continuation.resume(returning: ()) + connectionQueue.async { [weak self] in + guard let self else { return } + source.setRemoteDescription(sessionDescription) { error in + if let error = error { + continuation.resume(throwing: error) + } else { + self.subject.send(HasRemoteDescription(sessionDescription: sessionDescription)) + continuation.resume(returning: ()) + } } } } as () @@ -164,8 +171,13 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked with track: RTCMediaStreamTrack, init transceiverInit: RTCRtpTransceiverInit ) -> RTCRtpTransceiver? { - let result = source.addTransceiver(with: track, init: transceiverInit) - storeTransceiver(result, trackType: trackType) + var result: RTCRtpTransceiver? + connectionQueue.sync { + result = source.addTransceiver(with: track, init: transceiverInit) + } + if let result { + storeTransceiver(result, trackType: trackType) + } return result } @@ -197,18 +209,25 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked /// Restarts the ICE gathering process. func restartIce() { - source.restartIce() + connectionQueue.async { [weak self] in + guard let self else { return } + self.source.restartIce() + } } /// Closes the peer connection. func close() async { - Task { @MainActor in - /// It's very important to close any transceivers **before** we close the connection, to make - /// sure that access to `RTCVideoTrack` properties, will be handled correctly. Otherwise - /// if we try to access any property/method on a `RTCVideoTrack` instance whose - /// peerConnection has closed, we will get blocked on the Main Thread. - source.transceivers.forEach { $0.stopInternal() } - source.close() + await withCheckedContinuation { continuation in + connectionQueue.async { [weak self] in + guard let self else { return } + /// It's very important to close any transceivers **before** we close the connection, to make + /// sure that access to `RTCVideoTrack` properties, will be handled correctly. Otherwise + /// if we try to access any property/method on a `RTCVideoTrack` instance whose + /// peerConnection has closed, we will get blocked on the Main Thread. + self.source.transceivers.forEach { $0.stopInternal() } + self.source.close() + continuation.resume() + } } }