Skip to content

[Performance]Implement timers storage #824

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 11 additions & 14 deletions Sources/StreamVideo/CallKit/CallKitService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {

@Injected(\.callCache) private var callCache
@Injected(\.uuidFactory) private var uuidFactory
@Injected(\.timers) private var timers

/// Represents a call that is being managed by the service.
final class CallEntry: Equatable, @unchecked Sendable {
Expand Down Expand Up @@ -541,20 +542,16 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
/// - Parameter callState: The state of the call.
open func setUpRingingTimer(for callState: GetCallResponse) {
let timeout = TimeInterval(callState.call.settings.ring.autoCancelTimeoutMs / 1000)
ringingTimerCancellable = Foundation.Timer.publish(
every: timeout,
on: .main,
in: .default
)
.autoconnect()
.sink { [weak self] _ in
log.debug(
"Detected ringing timeout, hanging up...",
subsystems: .callKit
)
self?.callEnded(callState.call.cid, ringingTimedOut: true)
self?.ringingTimerCancellable = nil
}
ringingTimerCancellable = timers
.timer(for: timeout)
.sink { [weak self] _ in
log.debug(
"Detected ringing timeout, hanging up...",
subsystems: .callKit
)
self?.callEnded(callState.call.cid, ringingTimedOut: true)
self?.ringingTimerCancellable = nil
}
}

/// A method that's being called every time the StreamVideo instance is getting updated.
Expand Down
42 changes: 14 additions & 28 deletions Sources/StreamVideo/CallState.swift
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ public struct PermissionRequest: @unchecked Sendable, Identifiable {
public class CallState: ObservableObject {

@Injected(\.streamVideo) var streamVideo

@Injected(\.timers) var timers

/// The id of the current session.
/// When a call is started, a unique session identifier is assigned to the user in the call.
@Published public internal(set) var sessionId: String = ""
Expand Down Expand Up @@ -153,7 +154,7 @@ public class CallState: ObservableObject {
}

private var localCallSettingsUpdate = false
private var durationTimer: Foundation.Timer?
private var durationCancellable: AnyCancellable?

/// We mark this one as `nonisolated` to allow us to initialise a state instance without isolation.
/// That's a safe operation because `MainActor` is only required to ensure that all `@Published`
Expand Down Expand Up @@ -497,36 +498,21 @@ public class CallState: ObservableObject {

private func setupDurationTimer() {
resetTimer()
durationTimer = Foundation.Timer.scheduledTimer(withTimeInterval: 1.0, repeats: true, block: { [weak self] timer in
guard let self else {
timer.invalidate()
return
}
Task {
await MainActor.run {
self.updateDuration()
durationCancellable = timers
.timer(for: 1.0)
.receive(on: DispatchQueue.main)
.compactMap { [weak self] _ in
if let startedAt = self?.startedAt {
return Date().timeIntervalSince(startedAt)
} else {
return 0
}
}
})
.assign(to: \.duration, onWeak: self)
}

private func resetTimer() {
durationTimer?.invalidate()
durationTimer = nil
}

@objc private func updateDuration() {
guard let startedAt else {
update(duration: 0)
return
}
let timeInterval = Date().timeIntervalSince(startedAt)
update(duration: timeInterval)
}

private func update(duration: TimeInterval) {
if duration != self.duration {
self.duration = duration
}
durationCancellable?.cancel()
durationCancellable = nil
}
}
43 changes: 27 additions & 16 deletions Sources/StreamVideo/StreamVideo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ public typealias UserTokenUpdater = @Sendable(UserToken) -> Void
public class StreamVideo: ObservableObject, @unchecked Sendable {

@Injected(\.callCache) private var callCache
@Injected(\.timers) private var timers

public final class State: ObservableObject, @unchecked Sendable {
@Published public internal(set) var connection: ConnectionStatus
Expand Down Expand Up @@ -500,11 +501,23 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
log.debug("WS connected")
}

while (!connected && !timeout) {
try await Task.sleep(nanoseconds: 100_000)
}

if timeout {
do {
log.debug("Listening for WS connection")
_ = try await timers
.timer(for: 0.1)
.filter { [weak webSocketClient] _ in
guard let webSocketClient else {
return false
}
switch webSocketClient.connectionState {
case .connected:
return true
default:
return false
}
}
.nextValue(timeout: 30)
} catch {
log.debug("Timeout while waiting for WS connection opening")
throw ClientError.NetworkError()
}
Expand Down Expand Up @@ -554,18 +567,16 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
}
log.debug("Waiting for connection id")

while (loadConnectionIdFromHealthcheck() == nil && !timeout) {
try? await Task.sleep(nanoseconds: 100_000)
}

control.cancel()

if let connectionId = loadConnectionIdFromHealthcheck() {
log.debug("Connection id available from the WS")
return connectionId
do {
return try await timers
.timer(for: 0.1)
.log(.debug) { _ in "Waiting for connection id" }
.compactMap { [weak self] _ in self?.loadConnectionIdFromHealthcheck() }
.nextValue(timeout: 5)
} catch {
log.warning("Unable to load connectionId.")
return ""
}

return ""
}

private func loadConnectionIdFromHealthcheck() -> String? {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ open class StreamCallAudioRecorder: @unchecked Sendable {

@Injected(\.activeCallProvider) private var activeCallProvider
@Injected(\.activeCallAudioSession) private var activeCallAudioSession
@Injected(\.timers) private var timers

/// The builder used to create the AVAudioRecorder instance.
let audioRecorderBuilder: AVAudioRecorderBuilder
Expand Down Expand Up @@ -123,10 +124,8 @@ open class StreamCallAudioRecorder: @unchecked Sendable {

updateMetersTimerCancellable?.cancel()
disposableBag.remove("update-meters")
updateMetersTimerCancellable = Foundation
.Timer
.publish(every: 0.1, on: .main, in: .default)
.autoconnect()
updateMetersTimerCancellable = timers
.timer(for: ScreenPropertiesAdapter.currentValue.refreshRate)
.sinkTask(storeIn: disposableBag, identifier: "update-meters") { [weak self, audioRecorder] _ in
audioRecorder.updateMeters()
self?._metersPublisher.send(audioRecorder.averagePower(forChannel: 0))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import Foundation
/// A thread-safe queue that maintains a fixed capacity and removes elements after
/// a specified time interval.
final class OrderedCapacityQueue<Element> {
@Injected(\.timers) private var timers

private let queue = UnfairQueue()

/// The maximum number of elements the queue can hold.
Expand Down Expand Up @@ -48,10 +50,8 @@ final class OrderedCapacityQueue<Element> {
init(capacity: Int, removalTime: TimeInterval) {
self.capacity = capacity
self.removalTime = removalTime
removalTimerCancellable = Foundation
.Timer
.publish(every: ScreenPropertiesAdapter.currentValue.refreshRate, on: .main, in: .default)
.autoconnect()
removalTimerCancellable = timers
.timer(for: ScreenPropertiesAdapter.currentValue.refreshRate)
.sink { [weak self] _ in self?.removeItemsIfRequired() }
}

Expand Down Expand Up @@ -86,14 +86,8 @@ final class OrderedCapacityQueue<Element> {
/// should be enabled.
private func toggleRemovalObservation(_ isEnabled: Bool) {
if isEnabled, removalTimerCancellable == nil {
removalTimerCancellable = Foundation
.Timer
.publish(
every: ScreenPropertiesAdapter.currentValue.refreshRate,
on: .main,
in: .default
)
.autoconnect()
removalTimerCancellable = timers
.timer(for: ScreenPropertiesAdapter.currentValue.refreshRate)
.sink { [weak self] _ in self?.removeItemsIfRequired() }
} else if !isEnabled, removalTimerCancellable != nil {
removalTimerCancellable?.cancel()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,11 @@ import Foundation
import UIKit
#endif

final class ScreenPropertiesAdapter: @unchecked Sendable {
public final class ScreenPropertiesAdapter: @unchecked Sendable {

private(set) var preferredFramesPerSecond: Int = 0
private(set) var refreshRate: TimeInterval = 0
private(set) var scale: CGFloat = 0
public private(set) var preferredFramesPerSecond: Int = 0
public private(set) var refreshRate: TimeInterval = 0
public private(set) var scale: CGFloat = 0

init() {
Task { @MainActor in
Expand All @@ -29,11 +29,11 @@ final class ScreenPropertiesAdapter: @unchecked Sendable {
}

extension ScreenPropertiesAdapter: InjectionKey {
nonisolated(unsafe) static var currentValue: ScreenPropertiesAdapter = .init()
public nonisolated(unsafe) static var currentValue: ScreenPropertiesAdapter = .init()
}

extension InjectedValues {
var screenProperties: ScreenPropertiesAdapter {
public var screenProperties: ScreenPropertiesAdapter {
set { Self[ScreenPropertiesAdapter.self] = newValue }
get { Self[ScreenPropertiesAdapter.self] }
}
Expand Down
63 changes: 63 additions & 0 deletions Sources/StreamVideo/Utils/Timers/TimerStorage.swift
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
//
// Copyright Β© 2025 Stream.io Inc. All rights reserved.
//

import Combine
import Foundation

/// A protocol that provides publishers emitting time events at a set interval.
public protocol TimerProviding {
/// Returns a publisher that emits the current date every specified interval.
///
/// - Parameter interval: The time interval at which the publisher should emit.
/// - Returns: A publisher emitting `Date` values on the given interval.
func timer(for interval: TimeInterval) -> AnyPublisher<Date, Never>
}

/// A concrete implementation of `TimerProviding` that reuses publishers
/// for the same time intervals.
final class TimerStorage: TimerProviding {
/// A serial queue to synchronize access to the internal storage.
private let queue = UnfairQueue()

/// Stores interval-publisher pairs for reuse.
private var storage: [TimeInterval: AnyPublisher<Date, Never>] = [:]

/// Creates a new instance of `TimerStorage`.
init() {}

/// Returns a shared timer publisher for the given interval. If one already
/// exists, it is reused. Otherwise, a new one is created and stored.
///
/// - Parameter interval: The time interval at which the timer should tick.
/// - Returns: A publisher that emits the current date on the main run loop.
public func timer(for interval: TimeInterval) -> AnyPublisher<Date, Never> {
queue.sync {
if let publisher = storage[interval] {
return publisher
} else {
let publisher = Foundation
.Timer
.publish(every: interval, tolerance: interval, on: .main, in: .common)
.autoconnect()
.eraseToAnyPublisher()
storage[interval] = publisher
return publisher
}
}
}
}

/// An injection key for providing a default `TimerProviding` implementation.
enum TimerProviderKey: InjectionKey {
/// The default value for the `TimerProviding` dependency.
nonisolated(unsafe) public static var currentValue: TimerProviding = TimerStorage()
}

extension InjectedValues {
/// Accessor for the shared `TimerProviding` dependency.
public var timers: TimerProviding {
get { Self[TimerProviderKey.self] }
set { Self[TimerProviderKey.self] = newValue }
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ extension WebRTCCoordinator.StateMachine.Stage {
@unchecked Sendable
{
@Injected(\.internetConnectionObserver) private var internetConnectionObserver
@Injected(\.timers) private var timers

private var internetObservationCancellable: AnyCancellable?
private var timeInStageCancellable: AnyCancellable?
Expand Down Expand Up @@ -186,10 +187,8 @@ extension WebRTCCoordinator.StateMachine.Stage {
guard context.disconnectionTimeout > 0 else {
return
}
timeInStageCancellable = Foundation
.Timer
.publish(every: context.disconnectionTimeout, on: .main, in: .default)
.autoconnect()
timeInStageCancellable = timers
.timer(for: context.disconnectionTimeout)
.sink { [weak self] _ in self?.didTimeInStageExpired() }
}

Expand Down
13 changes: 5 additions & 8 deletions Sources/StreamVideo/WebRTC/v2/WebRTCStatsReporter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import Foundation
final class WebRTCStatsReporter: @unchecked Sendable {

@Injected(\.thermalStateObserver) private var thermalStateObserver
@Injected(\.timers) private var timers

/// The session ID associated with this reporter.
var sessionID: String
Expand Down Expand Up @@ -121,10 +122,8 @@ final class WebRTCStatsReporter: @unchecked Sendable {
}

collectionCancellable?.cancel()
collectionCancellable = Foundation
.Timer
.publish(every: interval, on: .main, in: .default)
.autoconnect()
collectionCancellable = timers
.timer(for: interval)
.log(.debug, subsystems: .webRTC) { _ in "Will collect stats." }
.sink { [weak self] _ in self?.collectStats() }

Expand All @@ -142,10 +141,8 @@ final class WebRTCStatsReporter: @unchecked Sendable {
}

deliveryCancellable?.cancel()
deliveryCancellable = Foundation
.Timer
.publish(every: interval, on: .main, in: .default)
.autoconnect()
deliveryCancellable = timers
.timer(for: interval)
.compactMap { [weak self] _ in self?.latestReportSubject.value }
.log(.debug, subsystems: .webRTC) { [weak self] in
"Will deliver stats report (timestamp:\($0.timestamp)) on \(self?.sfuAdapter?.hostname ?? "-")."
Expand Down
Loading