Skip to content

Commit b5dfb8b

Browse files
authored
[Enhancement]Implement new ICE restart Spec (#869)
1 parent e6552eb commit b5dfb8b

18 files changed

+638
-64
lines changed
Lines changed: 104 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,104 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
import Foundation
7+
import StreamWebRTC
8+
9+
/// An adapter that observes ICE connection state changes from the WebRTC peer
10+
/// connection and schedules reconnection attempts when necessary.
11+
final class ICEConnectionStateAdapter {
12+
13+
/// The peer connection coordinator to observe for ICE state changes.
14+
weak var peerConnectionCoordinator: RTCPeerConnectionCoordinator? {
15+
didSet { didUpdate(peerConnectionCoordinator) }
16+
}
17+
18+
/// A cancellable for the ICE state subscription.
19+
private var peerConnectionICEStateCancellable: AnyCancellable?
20+
21+
/// A cancellable for the scheduled ICE restart timer.
22+
private var scheduledRestartICECancellable: AnyCancellable?
23+
24+
/// The interval after which to restart ICE when disconnected.
25+
private let scheduleICERestartInterval: TimeInterval
26+
27+
/// A serial queue for processing ICE state events.
28+
private let processingQueue = DispatchQueue(
29+
label: "io.getstream.ICEConnectionStateAdapter"
30+
)
31+
32+
/// Creates a new ICEConnectionStateAdapter instance.
33+
///
34+
/// - Parameter scheduleICERestartInterval: The time interval after a
35+
/// disconnection to schedule an ICE restart. Default is 3 seconds.
36+
init(scheduleICERestartInterval: TimeInterval = 3) {
37+
self.scheduleICERestartInterval = scheduleICERestartInterval
38+
}
39+
40+
/// Responds to ICE connection state changes and performs the appropriate
41+
/// action depending on the new state.
42+
///
43+
/// - Parameter state: The current ICE connection state.
44+
/// - If the state is `.failed`, ICE will be restarted immediately.
45+
/// - If the state is `.disconnected`, ICE will be scheduled to restart
46+
/// after a short delay.
47+
/// - If the state is `.connected`, any pending restart will be cancelled.
48+
private func didUpdate(_ state: RTCIceConnectionState) {
49+
switch state {
50+
case .failed:
51+
scheduledRestartICECancellable?.cancel()
52+
scheduledRestartICECancellable = nil
53+
peerConnectionCoordinator?.restartICE()
54+
55+
case .disconnected:
56+
scheduledRestartICECancellable = Foundation
57+
.Timer
58+
.publish(every: scheduleICERestartInterval, on: .main, in: .default)
59+
.autoconnect()
60+
.sink { [weak self] _ in self?.restartICE() }
61+
62+
case .connected:
63+
scheduledRestartICECancellable?.cancel()
64+
scheduledRestartICECancellable = nil
65+
66+
default:
67+
break
68+
}
69+
}
70+
71+
/// Cancels any scheduled ICE restart and restarts ICE immediately.
72+
private func restartICE() {
73+
scheduledRestartICECancellable?.cancel()
74+
scheduledRestartICECancellable = nil
75+
peerConnectionCoordinator?.restartICE()
76+
}
77+
78+
/// Subscribes to ICE connection state events from the coordinator and
79+
/// cancels previous subscriptions when the coordinator changes.
80+
///
81+
/// - Parameter peerConnectionCoordinator: The coordinator to observe.
82+
private func didUpdate(
83+
_ peerConnectionCoordinator: RTCPeerConnectionCoordinator?
84+
) {
85+
guard let peerConnectionCoordinator else {
86+
peerConnectionICEStateCancellable?.cancel()
87+
peerConnectionICEStateCancellable = nil
88+
89+
scheduledRestartICECancellable?.cancel()
90+
scheduledRestartICECancellable = nil
91+
92+
return
93+
}
94+
95+
peerConnectionICEStateCancellable = peerConnectionCoordinator
96+
.eventPublisher
97+
.compactMap { $0 as? StreamRTCPeerConnection.ICEConnectionChangedEvent }
98+
.map(\.state)
99+
.receive(on: processingQueue)
100+
.removeDuplicates()
101+
.log(.debug) { "ICE connection state changed to: \($0)" }
102+
.sink { [weak self] in self?.didUpdate($0) }
103+
}
104+
}

Sources/StreamVideo/WebRTC/v2/PeerConnection/Protocols/StreamRTCPeerConnectionProtocol.swift

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,18 @@ protocol StreamRTCPeerConnectionProtocol: AnyObject, Sendable {
2525
/// A publisher for RTCPeerConnectionEvents.
2626
var publisher: AnyPublisher<RTCPeerConnectionEvent, Never> { get }
2727

28+
/// The current ICE connection state of the peer connection.
29+
///
30+
/// This property reflects the state of the ICE (Interactive Connectivity Establishment) agent,
31+
/// indicating the progress and status of the connection between peers.
32+
var iceConnectionState: RTCIceConnectionState { get }
33+
34+
/// The current signaling state of the peer connection.
35+
///
36+
/// This property reflects the overall state of the peer connection, including the signaling
37+
/// process and the establishment of media channels.
38+
var connectionState: RTCPeerConnectionState { get }
39+
2840
/// Sets the local description asynchronously.
2941
/// - Parameter sessionDescription: The RTCSessionDescription to set as the local description.
3042
/// - Throws: An error if setting the local description fails.

Sources/StreamVideo/WebRTC/v2/PeerConnection/RTCPeerConnectionCoordinator.swift

Lines changed: 35 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
4848
private let mediaAdapter: MediaAdapter
4949
private let iceAdapter: ICEAdapter
5050
private let sfuAdapter: SFUAdapter
51+
private let iceConnectionStateAdapter: ICEConnectionStateAdapter
5152

5253
private var callSettings: CallSettings
5354

@@ -73,12 +74,30 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
7374
var trackPublisher: AnyPublisher<TrackEvent, Never> { mediaAdapter.trackPublisher }
7475
var disconnectedPublisher: AnyPublisher<Void, Never> {
7576
peerConnection
76-
.publisher(eventType: StreamRTCPeerConnection.ICEConnectionChangedEvent.self)
77+
.publisher(eventType: StreamRTCPeerConnection.DidChangeConnectionStateEvent.self)
7778
.filter { $0.state == .disconnected || $0.state == .failed }
7879
.map { _ in () }
7980
.eraseToAnyPublisher()
8081
}
8182

83+
/// A Boolean value indicating whether the peer connection is in a healthy state.
84+
///
85+
/// The peer connection is considered healthy if its ICE connection state is not
86+
/// `.failed` or `.closed`, and its overall connection state is not `.failed` or `.closed`.
87+
/// This property provides a quick way to check if the connection is active and able to
88+
/// send or receive data.
89+
var isHealthy: Bool {
90+
let invalidICEConnectionStates = Set([RTCIceConnectionState.failed, .closed])
91+
let invalidConnectionStates = Set([RTCPeerConnectionState.failed, .closed])
92+
guard
93+
!invalidICEConnectionStates.contains(peerConnection.iceConnectionState),
94+
!invalidConnectionStates.contains(peerConnection.connectionState)
95+
else {
96+
return false
97+
}
98+
return true
99+
}
100+
82101
/// Retrieves track information for a specified track type and collection type.
83102
///
84103
/// - Parameters:
@@ -143,7 +162,14 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
143162
publishOptions: publishOptions,
144163
videoCaptureSessionProvider: videoCaptureSessionProvider,
145164
screenShareSessionProvider: screenShareSessionProvider
146-
)
165+
),
166+
iceAdapter: .init(
167+
sessionID: sessionId,
168+
peerType: peerType,
169+
peerConnection: peerConnection,
170+
sfuAdapter: sfuAdapter
171+
),
172+
iceConnectionStateAdapter: .init()
147173
)
148174
}
149175

@@ -156,7 +182,9 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
156182
audioSettings: AudioSettings,
157183
publishOptions: PublishOptions,
158184
sfuAdapter: SFUAdapter,
159-
mediaAdapter: MediaAdapter
185+
mediaAdapter: MediaAdapter,
186+
iceAdapter: ICEAdapter,
187+
iceConnectionStateAdapter: ICEConnectionStateAdapter
160188
) {
161189
self.sessionId = sessionId
162190
self.peerType = peerType
@@ -170,13 +198,11 @@ class RTCPeerConnectionCoordinator: @unchecked Sendable {
170198
? .peerConnectionPublisher
171199
: .peerConnectionSubscriber
172200
self.mediaAdapter = mediaAdapter
201+
self.iceAdapter = iceAdapter
202+
self.iceConnectionStateAdapter = iceConnectionStateAdapter
173203

174-
iceAdapter = .init(
175-
sessionID: sessionId,
176-
peerType: peerType,
177-
peerConnection: peerConnection,
178-
sfuAdapter: sfuAdapter
179-
)
204+
// Warm up instances
205+
iceConnectionStateAdapter.peerConnectionCoordinator = self
180206

181207
// Start ICERestart events observation
182208
observeICERestartEvents()

Sources/StreamVideo/WebRTC/v2/PeerConnection/StreamRTCPeerConnection.swift

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,10 @@ final class StreamRTCPeerConnection: StreamRTCPeerConnectionProtocol, @unchecked
3838
.receive(on: dispatchQueue)
3939
.eraseToAnyPublisher()
4040

41+
var iceConnectionState: RTCIceConnectionState { source.iceConnectionState }
42+
43+
var connectionState: RTCPeerConnectionState { source.connectionState }
44+
4145
private let delegatePublisher = DelegatePublisher()
4246
private let source: RTCPeerConnection
4347

Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Disconnected.swift

Lines changed: 48 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -143,14 +143,22 @@ extension WebRTCCoordinator.StateMachine.Stage {
143143
}
144144

145145
/// Attempts to reconnect based on the current reconnection strategy.
146-
private func reconnect() {
146+
private func reconnect() async {
147147
do {
148148
switch context.reconnectionStrategy {
149-
case let .fast(disconnectedSince, deadline) where abs(disconnectedSince.timeIntervalSinceNow) <= deadline:
150-
try transition?(.fastReconnecting(context))
151-
case .fast, .rejoin:
149+
case let .fast(disconnectedSince, deadline):
150+
if await isFastReconnectPossible(disconnectedSince: disconnectedSince, deadline: deadline) {
151+
context.fastReconnectionAttempts += 1
152+
try transition?(.fastReconnecting(context))
153+
} else {
154+
context.fastReconnectionAttempts = 0
155+
try transition?(.rejoining(context))
156+
}
157+
case .rejoin:
158+
context.fastReconnectionAttempts = 0
152159
try transition?(.rejoining(context))
153160
case .migrate:
161+
context.fastReconnectionAttempts = 0
154162
try transition?(.migrating(context))
155163
case .unknown:
156164
if let error = context.flowError {
@@ -191,7 +199,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
191199
.trace(.init(status: $0))
192200

193201
if $0.isAvailable {
194-
self?.reconnect()
202+
await self?.reconnect()
195203
}
196204
}
197205
}
@@ -239,5 +247,40 @@ extension WebRTCCoordinator.StateMachine.Stage {
239247
private func didTimeInStageExpired() {
240248
transitionErrorOrLog(ClientError.NetworkNotAvailable())
241249
}
250+
251+
/// Checks if a fast reconnect is possible based on several conditions.
252+
///
253+
/// A fast reconnect is a lightweight process to restore a connection without
254+
/// going through the full rejoin flow. This method evaluates whether the current
255+
/// state of the coordinator allows for such a reconnection.
256+
///
257+
/// - Parameters:
258+
/// - disconnectedSince: The `Date` when the disconnection occurred.
259+
/// - deadline: The `TimeInterval` within which a fast reconnect must be
260+
/// initiated.
261+
/// - Returns: `true` if a fast reconnect is possible, `false` otherwise.
262+
private func isFastReconnectPossible(
263+
disconnectedSince: Date,
264+
deadline: TimeInterval
265+
) async -> Bool {
266+
guard
267+
// Ensure we haven't exceeded the maximum number of fast reconnection attempts.
268+
context.fastReconnectionAttempts < context.fastReconnectionMaxAttempts,
269+
// Check if the time since disconnection is within the allowed deadline.
270+
abs(disconnectedSince.timeIntervalSinceNow) <= deadline,
271+
// Verify that the WebRTC publisher is available and in a healthy state.
272+
let publisher = await context.coordinator?.stateAdapter.publisher,
273+
publisher.isHealthy,
274+
// Verify that the WebRTC subscriber is available and in a healthy state.
275+
let subscriber = await context.coordinator?.stateAdapter.subscriber,
276+
subscriber.isHealthy
277+
else {
278+
// If any of the conditions are not met, fast reconnect is not possible.
279+
return false
280+
}
281+
282+
// All conditions are met, so a fast reconnect is possible.
283+
return true
284+
}
242285
}
243286
}

0 commit comments

Comments
 (0)