Skip to content

Commit 0f4a92a

Browse files
authored
[Performance]Implement timers storage (#824)
1 parent f9d7751 commit 0f4a92a

File tree

14 files changed

+167
-107
lines changed

14 files changed

+167
-107
lines changed

Sources/StreamVideo/CallKit/CallKitService.swift

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
1313

1414
@Injected(\.callCache) private var callCache
1515
@Injected(\.uuidFactory) private var uuidFactory
16+
@Injected(\.timers) private var timers
1617

1718
/// Represents a call that is being managed by the service.
1819
final class CallEntry: Equatable, @unchecked Sendable {
@@ -541,20 +542,16 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
541542
/// - Parameter callState: The state of the call.
542543
open func setUpRingingTimer(for callState: GetCallResponse) {
543544
let timeout = TimeInterval(callState.call.settings.ring.autoCancelTimeoutMs / 1000)
544-
ringingTimerCancellable = Foundation.Timer.publish(
545-
every: timeout,
546-
on: .main,
547-
in: .default
548-
)
549-
.autoconnect()
550-
.sink { [weak self] _ in
551-
log.debug(
552-
"Detected ringing timeout, hanging up...",
553-
subsystems: .callKit
554-
)
555-
self?.callEnded(callState.call.cid, ringingTimedOut: true)
556-
self?.ringingTimerCancellable = nil
557-
}
545+
ringingTimerCancellable = timers
546+
.timer(for: timeout)
547+
.sink { [weak self] _ in
548+
log.debug(
549+
"Detected ringing timeout, hanging up...",
550+
subsystems: .callKit
551+
)
552+
self?.callEnded(callState.call.cid, ringingTimedOut: true)
553+
self?.ringingTimerCancellable = nil
554+
}
558555
}
559556

560557
/// A method that's being called every time the StreamVideo instance is getting updated.

Sources/StreamVideo/CallState.swift

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,8 @@ public struct PermissionRequest: @unchecked Sendable, Identifiable {
3333
public class CallState: ObservableObject {
3434

3535
@Injected(\.streamVideo) var streamVideo
36-
36+
@Injected(\.timers) var timers
37+
3738
/// The id of the current session.
3839
/// When a call is started, a unique session identifier is assigned to the user in the call.
3940
@Published public internal(set) var sessionId: String = ""
@@ -153,7 +154,7 @@ public class CallState: ObservableObject {
153154
}
154155

155156
private var localCallSettingsUpdate = false
156-
private var durationTimer: Foundation.Timer?
157+
private var durationCancellable: AnyCancellable?
157158

158159
/// We mark this one as `nonisolated` to allow us to initialise a state instance without isolation.
159160
/// That's a safe operation because `MainActor` is only required to ensure that all `@Published`
@@ -497,36 +498,21 @@ public class CallState: ObservableObject {
497498

498499
private func setupDurationTimer() {
499500
resetTimer()
500-
durationTimer = Foundation.Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true, block: { [weak self] timer in
501-
guard let self else {
502-
timer.invalidate()
503-
return
504-
}
505-
Task {
506-
await MainActor.run {
507-
self.updateDuration()
501+
durationCancellable = timers
502+
.timer(for: 1.0)
503+
.receive(on: DispatchQueue.main)
504+
.compactMap { [weak self] _ in
505+
if let startedAt = self?.startedAt {
506+
return Date().timeIntervalSince(startedAt)
507+
} else {
508+
return 0
508509
}
509510
}
510-
})
511+
.assign(to: \.duration, onWeak: self)
511512
}
512513

513514
private func resetTimer() {
514-
durationTimer?.invalidate()
515-
durationTimer = nil
516-
}
517-
518-
@objc private func updateDuration() {
519-
guard let startedAt else {
520-
update(duration: 0)
521-
return
522-
}
523-
let timeInterval = Date().timeIntervalSince(startedAt)
524-
update(duration: timeInterval)
525-
}
526-
527-
private func update(duration: TimeInterval) {
528-
if duration != self.duration {
529-
self.duration = duration
530-
}
515+
durationCancellable?.cancel()
516+
durationCancellable = nil
531517
}
532518
}

Sources/StreamVideo/StreamVideo.swift

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ public typealias UserTokenUpdater = @Sendable(UserToken) -> Void
1515
public class StreamVideo: ObservableObject, @unchecked Sendable {
1616

1717
@Injected(\.callCache) private var callCache
18+
@Injected(\.timers) private var timers
1819

1920
public final class State: ObservableObject, @unchecked Sendable {
2021
@Published public internal(set) var connection: ConnectionStatus
@@ -500,11 +501,23 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
500501
log.debug("WS connected")
501502
}
502503

503-
while (!connected && !timeout) {
504-
try await Task.sleep(nanoseconds: 100_000)
505-
}
506-
507-
if timeout {
504+
do {
505+
log.debug("Listening for WS connection")
506+
_ = try await timers
507+
.timer(for: 0.1)
508+
.filter { [weak webSocketClient] _ in
509+
guard let webSocketClient else {
510+
return false
511+
}
512+
switch webSocketClient.connectionState {
513+
case .connected:
514+
return true
515+
default:
516+
return false
517+
}
518+
}
519+
.nextValue(timeout: 30)
520+
} catch {
508521
log.debug("Timeout while waiting for WS connection opening")
509522
throw ClientError.NetworkError()
510523
}
@@ -554,18 +567,16 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
554567
}
555568
log.debug("Waiting for connection id")
556569

557-
while (loadConnectionIdFromHealthcheck() == nil && !timeout) {
558-
try? await Task.sleep(nanoseconds: 100_000)
559-
}
560-
561-
control.cancel()
562-
563-
if let connectionId = loadConnectionIdFromHealthcheck() {
564-
log.debug("Connection id available from the WS")
565-
return connectionId
570+
do {
571+
return try await timers
572+
.timer(for: 0.1)
573+
.log(.debug) { _ in "Waiting for connection id" }
574+
.compactMap { [weak self] _ in self?.loadConnectionIdFromHealthcheck() }
575+
.nextValue(timeout: 5)
576+
} catch {
577+
log.warning("Unable to load connectionId.")
578+
return ""
566579
}
567-
568-
return ""
569580
}
570581

571582
private func loadConnectionIdFromHealthcheck() -> String? {

Sources/StreamVideo/Utils/AudioSession/AudioRecorder/StreamCallAudioRecorder.swift

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ open class StreamCallAudioRecorder: @unchecked Sendable {
1616

1717
@Injected(\.activeCallProvider) private var activeCallProvider
1818
@Injected(\.activeCallAudioSession) private var activeCallAudioSession
19+
@Injected(\.timers) private var timers
1920

2021
/// The builder used to create the AVAudioRecorder instance.
2122
let audioRecorderBuilder: AVAudioRecorderBuilder
@@ -123,10 +124,8 @@ open class StreamCallAudioRecorder: @unchecked Sendable {
123124

124125
updateMetersTimerCancellable?.cancel()
125126
disposableBag.remove("update-meters")
126-
updateMetersTimerCancellable = Foundation
127-
.Timer
128-
.publish(every: 0.1, on: .main, in: .default)
129-
.autoconnect()
127+
updateMetersTimerCancellable = timers
128+
.timer(for: ScreenPropertiesAdapter.currentValue.refreshRate)
130129
.sinkTask(storeIn: disposableBag, identifier: "update-meters") { [weak self, audioRecorder] _ in
131130
audioRecorder.updateMeters()
132131
self?._metersPublisher.send(audioRecorder.averagePower(forChannel: 0))

Sources/StreamVideo/Utils/OrderedCapacityQueue/OrderedCapacityQueue.swift

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ import Foundation
88
/// A thread-safe queue that maintains a fixed capacity and removes elements after
99
/// a specified time interval.
1010
final class OrderedCapacityQueue<Element> {
11+
@Injected(\.timers) private var timers
12+
1113
private let queue = UnfairQueue()
1214

1315
/// The maximum number of elements the queue can hold.
@@ -48,10 +50,8 @@ final class OrderedCapacityQueue<Element> {
4850
init(capacity: Int, removalTime: TimeInterval) {
4951
self.capacity = capacity
5052
self.removalTime = removalTime
51-
removalTimerCancellable = Foundation
52-
.Timer
53-
.publish(every: ScreenPropertiesAdapter.currentValue.refreshRate, on: .main, in: .default)
54-
.autoconnect()
53+
removalTimerCancellable = timers
54+
.timer(for: ScreenPropertiesAdapter.currentValue.refreshRate)
5555
.sink { [weak self] _ in self?.removeItemsIfRequired() }
5656
}
5757

@@ -86,14 +86,8 @@ final class OrderedCapacityQueue<Element> {
8686
/// should be enabled.
8787
private func toggleRemovalObservation(_ isEnabled: Bool) {
8888
if isEnabled, removalTimerCancellable == nil {
89-
removalTimerCancellable = Foundation
90-
.Timer
91-
.publish(
92-
every: ScreenPropertiesAdapter.currentValue.refreshRate,
93-
on: .main,
94-
in: .default
95-
)
96-
.autoconnect()
89+
removalTimerCancellable = timers
90+
.timer(for: ScreenPropertiesAdapter.currentValue.refreshRate)
9791
.sink { [weak self] _ in self?.removeItemsIfRequired() }
9892
} else if !isEnabled, removalTimerCancellable != nil {
9993
removalTimerCancellable?.cancel()

Sources/StreamVideo/Utils/ScreenPropertiesAdapter/ScreenPropertiesAdapter.swift

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,11 +7,11 @@ import Foundation
77
import UIKit
88
#endif
99

10-
final class ScreenPropertiesAdapter: @unchecked Sendable {
10+
public final class ScreenPropertiesAdapter: @unchecked Sendable {
1111

12-
private(set) var preferredFramesPerSecond: Int = 0
13-
private(set) var refreshRate: TimeInterval = 0
14-
private(set) var scale: CGFloat = 0
12+
public private(set) var preferredFramesPerSecond: Int = 0
13+
public private(set) var refreshRate: TimeInterval = 0
14+
public private(set) var scale: CGFloat = 0
1515

1616
init() {
1717
Task { @MainActor in
@@ -29,11 +29,11 @@ final class ScreenPropertiesAdapter: @unchecked Sendable {
2929
}
3030

3131
extension ScreenPropertiesAdapter: InjectionKey {
32-
nonisolated(unsafe) static var currentValue: ScreenPropertiesAdapter = .init()
32+
public nonisolated(unsafe) static var currentValue: ScreenPropertiesAdapter = .init()
3333
}
3434

3535
extension InjectedValues {
36-
var screenProperties: ScreenPropertiesAdapter {
36+
public var screenProperties: ScreenPropertiesAdapter {
3737
set { Self[ScreenPropertiesAdapter.self] = newValue }
3838
get { Self[ScreenPropertiesAdapter.self] }
3939
}
Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,63 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
import Foundation
7+
8+
/// A protocol that provides publishers emitting time events at a set interval.
9+
public protocol TimerProviding {
10+
/// Returns a publisher that emits the current date every specified interval.
11+
///
12+
/// - Parameter interval: The time interval at which the publisher should emit.
13+
/// - Returns: A publisher emitting `Date` values on the given interval.
14+
func timer(for interval: TimeInterval) -> AnyPublisher<Date, Never>
15+
}
16+
17+
/// A concrete implementation of `TimerProviding` that reuses publishers
18+
/// for the same time intervals.
19+
final class TimerStorage: TimerProviding {
20+
/// A serial queue to synchronize access to the internal storage.
21+
private let queue = UnfairQueue()
22+
23+
/// Stores interval-publisher pairs for reuse.
24+
private var storage: [TimeInterval: AnyPublisher<Date, Never>] = [:]
25+
26+
/// Creates a new instance of `TimerStorage`.
27+
init() {}
28+
29+
/// Returns a shared timer publisher for the given interval. If one already
30+
/// exists, it is reused. Otherwise, a new one is created and stored.
31+
///
32+
/// - Parameter interval: The time interval at which the timer should tick.
33+
/// - Returns: A publisher that emits the current date on the main run loop.
34+
public func timer(for interval: TimeInterval) -> AnyPublisher<Date, Never> {
35+
queue.sync {
36+
if let publisher = storage[interval] {
37+
return publisher
38+
} else {
39+
let publisher = Foundation
40+
.Timer
41+
.publish(every: interval, tolerance: interval, on: .main, in: .common)
42+
.autoconnect()
43+
.eraseToAnyPublisher()
44+
storage[interval] = publisher
45+
return publisher
46+
}
47+
}
48+
}
49+
}
50+
51+
/// An injection key for providing a default `TimerProviding` implementation.
52+
enum TimerProviderKey: InjectionKey {
53+
/// The default value for the `TimerProviding` dependency.
54+
nonisolated(unsafe) public static var currentValue: TimerProviding = TimerStorage()
55+
}
56+
57+
extension InjectedValues {
58+
/// Accessor for the shared `TimerProviding` dependency.
59+
public var timers: TimerProviding {
60+
get { Self[TimerProviderKey.self] }
61+
set { Self[TimerProviderKey.self] = newValue }
62+
}
63+
}

Sources/StreamVideo/WebRTC/v2/StateMachine/Stages/WebRTCCoordinator+Disconnected.swift

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
3030
@unchecked Sendable
3131
{
3232
@Injected(\.internetConnectionObserver) private var internetConnectionObserver
33+
@Injected(\.timers) private var timers
3334

3435
private var internetObservationCancellable: AnyCancellable?
3536
private var timeInStageCancellable: AnyCancellable?
@@ -186,10 +187,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
186187
guard context.disconnectionTimeout > 0 else {
187188
return
188189
}
189-
timeInStageCancellable = Foundation
190-
.Timer
191-
.publish(every: context.disconnectionTimeout, on: .main, in: .default)
192-
.autoconnect()
190+
timeInStageCancellable = timers
191+
.timer(for: context.disconnectionTimeout)
193192
.sink { [weak self] _ in self?.didTimeInStageExpired() }
194193
}
195194

Sources/StreamVideo/WebRTC/v2/WebRTCStatsReporter.swift

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import Foundation
1616
final class WebRTCStatsReporter: @unchecked Sendable {
1717

1818
@Injected(\.thermalStateObserver) private var thermalStateObserver
19+
@Injected(\.timers) private var timers
1920

2021
/// The session ID associated with this reporter.
2122
var sessionID: String
@@ -121,10 +122,8 @@ final class WebRTCStatsReporter: @unchecked Sendable {
121122
}
122123

123124
collectionCancellable?.cancel()
124-
collectionCancellable = Foundation
125-
.Timer
126-
.publish(every: interval, on: .main, in: .default)
127-
.autoconnect()
125+
collectionCancellable = timers
126+
.timer(for: interval)
128127
.log(.debug, subsystems: .webRTC) { _ in "Will collect stats." }
129128
.sink { [weak self] _ in self?.collectStats() }
130129

@@ -142,10 +141,8 @@ final class WebRTCStatsReporter: @unchecked Sendable {
142141
}
143142

144143
deliveryCancellable?.cancel()
145-
deliveryCancellable = Foundation
146-
.Timer
147-
.publish(every: interval, on: .main, in: .default)
148-
.autoconnect()
144+
deliveryCancellable = timers
145+
.timer(for: interval)
149146
.compactMap { [weak self] _ in self?.latestReportSubject.value }
150147
.log(.debug, subsystems: .webRTC) { [weak self] in
151148
"Will deliver stats report (timestamp:\($0.timestamp)) on \(self?.sfuAdapter?.hostname ?? "-")."

0 commit comments

Comments
 (0)