Skip to content

Commit 5c2224c

Browse files
authored
[Fix]Unnecessary calls to updateSubscriptions (#795)
1 parent e8fe9f3 commit 5c2224c

File tree

10 files changed

+357
-116
lines changed

10 files changed

+357
-116
lines changed
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
import Foundation
7+
8+
#if compiler(>=6.0)
9+
extension CurrentValueSubject: @retroactive @unchecked Sendable {}
10+
extension PassthroughSubject: @retroactive @unchecked Sendable {}
11+
#else
12+
extension CurrentValueSubject: @unchecked Sendable {}
13+
extension PassthroughSubject: @unchecked Sendable {}
14+
#endif

Sources/StreamVideo/WebRTC/v2/SFU/SFUAdapter.swift

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -476,7 +476,11 @@ final class SFUAdapter: ConnectionStateDelegate, CustomStringConvertible, @unche
476476

477477
try Task.checkCancellation()
478478

479-
log.debug(request, subsystems: .sfu)
479+
log
480+
.debug(
481+
"Request sessionId:\(sessionId) tracks:\(tracks.map { "\($0.userID):\($0.sessionID):\($0.trackType.rawValue):\($0.dimension.width)x\($0.dimension.height)" }.sorted())",
482+
subsystems: .sfu
483+
)
480484
let task = Task { [request, signalService] in
481485
try Task.checkCancellation()
482486
return try await executeTask(retryPolicy: .neverGonnaGiveYouUp { true }) {

Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Joined.swift

Lines changed: 22 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
3131
@Injected(\.internetConnectionObserver) private var internetConnectionObserver
3232

3333
private let disposableBag = DisposableBag()
34+
private var updateSubscriptionsAdapter: WebRTCUpdateSubscriptionsAdapter?
3435

3536
/// Initializes a new instance of `JoinedStage`.
3637
/// - Parameter context: The context for the joined stage.
@@ -97,10 +98,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
9798

9899
try Task.checkCancellation()
99100

100-
await observeForSubscriptionUpdates()
101-
102-
try Task.checkCancellation()
103-
104101
await observeConnection()
105102

106103
try Task.checkCancellation()
@@ -121,10 +118,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
121118

122119
try Task.checkCancellation()
123120

124-
await observeIncomingVideoQualitySettingsUpdates()
125-
126-
try Task.checkCancellation()
127-
128121
await observeCallSettingsUpdates()
129122

130123
try Task.checkCancellation()
@@ -134,6 +127,10 @@ extension WebRTCCoordinator.StateMachine.Stage {
134127
try Task.checkCancellation()
135128

136129
await configureStatsCollectionAndDelivery()
130+
131+
try Task.checkCancellation()
132+
133+
await configureUpdateSubscriptions()
137134
} catch {
138135
await cleanUpPreviousSessionIfRequired()
139136
transitionDisconnectOrError(error)
@@ -327,24 +324,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
327324
.store(in: disposableBag)
328325
}
329326

330-
/// Observes changes to the list of participants and triggers subscription
331-
/// updates when the participant list changes. This ensures that subscriptions
332-
/// are kept in sync with the current participants.
333-
private func observeForSubscriptionUpdates() async {
334-
guard
335-
let stateAdapter = context.coordinator?.stateAdapter
336-
else {
337-
return
338-
}
339-
340-
await stateAdapter
341-
.$participants
342-
.removeDuplicates()
343-
.log(.debug) { "\($0.count) Participants updated and we update subscriptions now." }
344-
.sinkTask(storeIn: disposableBag) { [weak self] _ in await self?.updateSubscriptions() }
345-
.store(in: disposableBag) // Store the Combine subscription in the disposable bag.
346-
}
347-
348327
/// Observes updates to the `callSettings` and ensures that any changes are
349328
/// reflected in the publisher. This ensures that updates to audio, video, and
350329
/// audio output settings are applied correctly during a WebRTC session.
@@ -423,30 +402,6 @@ extension WebRTCCoordinator.StateMachine.Stage {
423402
.store(in: disposableBag)
424403
}
425404

426-
/// Observes changes to the `incomingVideoQualitySettings` and triggers updates when
427-
/// the settings change. It ensures that the video subscriptions are updated
428-
/// accordingly and forces a participant update to refresh the UI.
429-
private func observeIncomingVideoQualitySettingsUpdates() async {
430-
guard
431-
let stateAdapter = context.coordinator?.stateAdapter
432-
else {
433-
return
434-
}
435-
436-
await stateAdapter
437-
.$incomingVideoQualitySettings
438-
.removeDuplicates()
439-
.log(.debug) { "Incoming video quality settings updated \($0) and we update subscriptions now." }
440-
.sinkTask(storeIn: disposableBag) { [weak self] _ in
441-
guard let self else { return }
442-
443-
await updateSubscriptions()
444-
/// Force a participant update to ensure the UI reflects the new policy.
445-
await stateAdapter.enqueue { $0 }
446-
}
447-
.store(in: disposableBag)
448-
}
449-
450405
/// Configures the collection and delivery of WebRTC statistics by setting up
451406
/// or updating the `WebRTCStatsReporter` for the current session. This ensures
452407
/// that statistics such as network quality, peer connection status, and SFU
@@ -527,58 +482,29 @@ extension WebRTCCoordinator.StateMachine.Stage {
527482
.store(in: disposableBag)
528483
}
529484

530-
// MARK: - Private helpers
531-
532-
/// Updates the WebRTC subscriptions based on the current state, including the
533-
/// incoming video policy and the list of participants. The method communicates
534-
/// with the SFU (Selective Forwarding Unit) adapter to adjust track subscriptions.
485+
/// Configures the subscription adapter responsible for managing WebRTC
486+
/// track subscriptions.
535487
///
536-
/// - Throws: An error if the subscriptions cannot be updated or if the task
537-
/// is cancelled during execution.
538-
private func updateSubscriptions() async {
488+
/// This function initializes the `WebRTCUpdateSubscriptionsAdapter` using
489+
/// the current participants and incoming video quality settings. It ensures
490+
/// that subscription updates are properly set up for the active SFU adapter
491+
/// and session.
492+
private func configureUpdateSubscriptions() async {
539493
guard
540-
let coordinator = context.coordinator,
541-
let sfuAdapter = await coordinator.stateAdapter.sfuAdapter
494+
let stateAdapter = context.coordinator?.stateAdapter,
495+
let sfuAdapter = await stateAdapter.sfuAdapter
542496
else {
543497
return
544498
}
545499

546-
let incomingVideoQualitySettings = await coordinator
547-
.stateAdapter
548-
.incomingVideoQualitySettings
549-
550-
let tracks = await WebRTCJoinRequestFactory()
551-
.buildSubscriptionDetails(
552-
nil,
553-
coordinator: coordinator,
554-
incomingVideoQualitySettings: incomingVideoQualitySettings
555-
)
556-
557-
do {
558-
try Task.checkCancellation()
559-
560-
let participants = await coordinator.stateAdapter.participants
561-
562-
log.debug(
563-
"""
564-
Updating subscriptions for \(participants.count - 1) participants
565-
with incomingVideoQualitySettings: \(incomingVideoQualitySettings).
566-
""",
567-
subsystems: .webRTC
568-
)
569-
570-
try await sfuAdapter.updateSubscriptions(
571-
tracks: tracks,
572-
for: await coordinator.stateAdapter.sessionID
573-
)
574-
} catch {
575-
log.warning(
576-
"""
577-
UpdateSubscriptions failed with error:\(error).
578-
""",
579-
subsystems: .webRTC
580-
)
581-
}
500+
updateSubscriptionsAdapter = .init(
501+
participantsPublisher: await stateAdapter.$participants.eraseToAnyPublisher(),
502+
incomingVideoQualitySettingsPublisher: await stateAdapter
503+
.$incomingVideoQualitySettings
504+
.eraseToAnyPublisher(),
505+
sfuAdapter: sfuAdapter,
506+
sessionID: await stateAdapter.sessionID
507+
)
582508
}
583509
}
584510
}
Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
import Foundation
7+
8+
/// An adapter responsible for managing WebRTC subscription updates.
9+
///
10+
/// This class listens to participants and incoming video quality settings,
11+
/// computes the necessary subscription details, and instructs the SFUAdapter
12+
/// to update subscriptions accordingly. It ensures updates are sent only
13+
/// when there are meaningful changes.
14+
final class WebRTCUpdateSubscriptionsAdapter: @unchecked Sendable {
15+
/// The session identifier for the current WebRTC session.
16+
private let sessionID: String
17+
/// The adapter used to communicate with the SFU for updates.
18+
private let sfuAdapter: SFUAdapter
19+
/// A serial queue used to process update tasks in order.
20+
private let processingQueue = SerialActorQueue()
21+
/// A factory that builds subscription details for WebRTC tracks.
22+
private let tracksFactory: WebRTCJoinRequestFactory = .init()
23+
/// A container for cancellable Combine subscriptions.
24+
private let disposableBag = DisposableBag()
25+
/// The active Combine subscription observing participants and settings.
26+
private var observable: AnyCancellable?
27+
28+
/// Stores the last set of track subscription details sent to the SFU.
29+
private var lastTrackSubscriptionDetails:
30+
[Stream_Video_Sfu_Signal_TrackSubscriptionDetails] = []
31+
32+
/// Initializes the adapter with publishers and required dependencies.
33+
///
34+
/// - Parameters:
35+
/// - participantsPublisher: A publisher emitting current participants.
36+
/// - incomingVideoQualitySettingsPublisher: A publisher emitting
37+
/// video quality settings.
38+
/// - sfuAdapter: The SFU adapter to send updates to.
39+
/// - sessionID: The identifier for the session.
40+
init(
41+
participantsPublisher: AnyPublisher<
42+
WebRTCStateAdapter.ParticipantsStorage, Never
43+
>,
44+
incomingVideoQualitySettingsPublisher: AnyPublisher<
45+
IncomingVideoQualitySettings, Never
46+
>,
47+
sfuAdapter: SFUAdapter,
48+
sessionID: String
49+
) {
50+
self.sessionID = sessionID
51+
self.sfuAdapter = sfuAdapter
52+
observable = Publishers.CombineLatest(
53+
participantsPublisher,
54+
incomingVideoQualitySettingsPublisher
55+
)
56+
.sink { [weak self] in self?.didUpdate(
57+
participants: $0,
58+
incomingVideoQualitySettings: $1
59+
) }
60+
}
61+
62+
deinit {
63+
processingQueue.cancelAll()
64+
}
65+
66+
// MARK: - Private Helpers
67+
68+
/// Handles updates when participants or quality settings change.
69+
///
70+
/// This function builds the new subscription details, compares them to
71+
/// the last known state, and triggers an update if they differ.
72+
///
73+
/// - Parameters:
74+
/// - participants: The current storage of participants.
75+
/// - incomingVideoQualitySettings: The current video quality settings.
76+
private func didUpdate(
77+
participants: WebRTCStateAdapter.ParticipantsStorage,
78+
incomingVideoQualitySettings: IncomingVideoQualitySettings
79+
) {
80+
processingQueue.async { [weak self] in
81+
guard let self else {
82+
return
83+
}
84+
85+
let tracks = tracksFactory.buildSubscriptionDetails(
86+
nil,
87+
sessionID: sessionID,
88+
participants: Array(participants.values),
89+
incomingVideoQualitySettings: incomingVideoQualitySettings
90+
)
91+
.filter { $0.trackType != .audio }
92+
93+
let setTracks = Set(tracks)
94+
let setLastTrackSubscriptionDetails =
95+
Set(lastTrackSubscriptionDetails)
96+
97+
guard setTracks != setLastTrackSubscriptionDetails else {
98+
return
99+
}
100+
101+
do {
102+
try Task.checkCancellation()
103+
try await sfuAdapter.updateSubscriptions(
104+
tracks: tracks,
105+
for: sessionID
106+
)
107+
lastTrackSubscriptionDetails = tracks
108+
} catch {
109+
log.warning(
110+
"UpdateSubscriptions failed with error:\(error).",
111+
subsystems: .webRTC
112+
)
113+
}
114+
}
115+
}
116+
}

Sources/StreamVideo/WebRTC/v2/WebRTCJoinRequestFactory.swift

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ struct WebRTCJoinRequestFactory {
110110
)
111111
result.subscriptions = await buildSubscriptionDetails(
112112
nil,
113-
coordinator: coordinator,
113+
sessionID: await coordinator.stateAdapter.sessionID,
114+
participants: Array(await coordinator.stateAdapter.participants.values),
114115
incomingVideoQualitySettings: await coordinator
115116
.stateAdapter
116117
.incomingVideoQualitySettings,
@@ -132,7 +133,8 @@ struct WebRTCJoinRequestFactory {
132133
result.fromSfuID = fromHostname
133134
result.subscriptions = await buildSubscriptionDetails(
134135
nil,
135-
coordinator: coordinator,
136+
sessionID: await coordinator.stateAdapter.sessionID,
137+
participants: Array(await coordinator.stateAdapter.participants.values),
136138
incomingVideoQualitySettings: await coordinator
137139
.stateAdapter
138140
.incomingVideoQualitySettings,
@@ -153,7 +155,8 @@ struct WebRTCJoinRequestFactory {
153155
)
154156
result.subscriptions = await buildSubscriptionDetails(
155157
fromSessionID,
156-
coordinator: coordinator,
158+
sessionID: await coordinator.stateAdapter.sessionID,
159+
participants: Array(await coordinator.stateAdapter.participants.values),
157160
incomingVideoQualitySettings: await coordinator
158161
.stateAdapter
159162
.incomingVideoQualitySettings,
@@ -208,14 +211,14 @@ struct WebRTCJoinRequestFactory {
208211
/// - Returns: An array of track subscription details.
209212
func buildSubscriptionDetails(
210213
_ previousSessionID: String?,
211-
coordinator: WebRTCCoordinator,
214+
sessionID: String,
215+
participants: [CallParticipant],
212216
incomingVideoQualitySettings: IncomingVideoQualitySettings,
213217
file: StaticString = #fileID,
214218
function: StaticString = #function,
215219
line: UInt = #line
216-
) async -> [Stream_Video_Sfu_Signal_TrackSubscriptionDetails] {
217-
let sessionID = await coordinator.stateAdapter.sessionID
218-
return Array(await coordinator.stateAdapter.participants.values)
220+
) -> [Stream_Video_Sfu_Signal_TrackSubscriptionDetails] {
221+
participants
219222
.filter { $0.id != sessionID && $0.id != previousSessionID }
220223
.flatMap { $0.trackSubscriptionDetails(incomingVideoQualitySettings: incomingVideoQualitySettings) }
221224
}

0 commit comments

Comments
 (0)