Skip to content

[Performance]Break async stream memory leaks #826

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 8 additions & 15 deletions DemoApp/Sources/Components/Reactions/ReactionsAdapter.swift
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,9 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable {

var player: AVAudioPlayer?

private var reactionsTask: Task<Void, Error>?
private var callEndedNotificationObserver: Any?
private var activeCallUpdated: AnyCancellable?
private let disposableBag = DisposableBag()
private var call: Call? { didSet { subscribeToReactionEvents() } }

@Published var showFireworks = false
Expand Down Expand Up @@ -90,24 +90,22 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable {

private func subscribeToReactionEvents() {
guard let call else {
reactionsTask?.cancel()
disposableBag.removeAll()
return
}

let callReactionEventsStream = call.subscribe(for: CallReactionEvent.self)

reactionsTask = Task { [weak self] in
for await event in callReactionEventsStream {
call
.eventPublisher(for: CallReactionEvent.self)
.sinkTask(storeIn: disposableBag) { [weak self] event in
guard
let reaction = self?.reaction(for: event)
else {
continue
return
}
self?.handleReaction(reaction, from: event.reaction.user.toUser)
log.debug("\(event.reaction.user.name ?? event.reaction.user.id) reacted with reaction:\(reaction.id)")
}
return
}
.store(in: disposableBag)
}

private func reaction(for event: CallReactionEvent) -> Reaction? {
Expand Down Expand Up @@ -206,12 +204,7 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable {
}

private func handleCallEnded() {
Task {
await MainActor.run { [weak self] in
self?.reactionsTask?.cancel()
self?.reactionsTask = nil
}
}
disposableBag.removeAll()
}
}

Expand Down
6 changes: 3 additions & 3 deletions Sources/StreamVideo/Call.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1403,17 +1403,17 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
}
}

internal func onEvent(_ event: WrappedEvent) {
internal func onEvent(_ event: WrappedEvent) async {
guard case let .coordinatorEvent(videoEvent) = event else {
return
}
guard videoEvent.forCall(cid: cId) else {
return
}
executeOnMain { [weak self] in
await Task { @MainActor [weak self] in
guard let self else { return }
self.state.updateState(from: videoEvent)
}
}.value

eventSubject.send(event)
}
Expand Down
13 changes: 7 additions & 6 deletions Sources/StreamVideo/CallKit/CallKitService.swift
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
@Injected(\.callCache) private var callCache
@Injected(\.uuidFactory) private var uuidFactory
@Injected(\.timers) private var timers
private let disposableBag = DisposableBag()

/// Represents a call that is being managed by the service.
final class CallEntry: Equatable, @unchecked Sendable {
Expand Down Expand Up @@ -89,7 +90,6 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
private var active: UUID?
var callCount: Int { storageAccessQueue.sync { _storage.count } }

private var callEventsSubscription: Task<Void, Error>?
private var callEndedNotificationCancellable: AnyCancellable?
private var ringingTimerCancellable: AnyCancellable?

Expand Down Expand Up @@ -567,8 +567,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
/// Subscribing to events is being used to reject/stop calls that have been accepted/rejected
/// on other devices or components (e.g. incoming callScreen, CallKitService)
private func subscribeToCallEvents() {
callEventsSubscription?.cancel()
callEventsSubscription = nil
disposableBag.removeAll()

guard let streamVideo else {
log.warning(
Expand All @@ -581,8 +580,10 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
return
}

callEventsSubscription = Task {
for await event in streamVideo.subscribe() {
streamVideo
.eventPublisher()
.sink { [weak self] event in
guard let self else { return }
switch event {
case let .typeCallEndedEvent(response):
callEnded(response.callCid, ringingTimedOut: false)
Expand All @@ -596,7 +597,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
break
}
}
}
.store(in: disposableBag)

log.debug(
"\(type(of: self)) is now subscribed to CallEvent updates.",
Expand Down
20 changes: 8 additions & 12 deletions Sources/StreamVideo/Controllers/CallsController.swift
Original file line number Diff line number Diff line change
Expand Up @@ -31,10 +31,9 @@ public class CallsController: ObservableObject, @unchecked Sendable {

private let callsQuery: CallsQuery
private let streamVideo: StreamVideo

private var watchTask: Task<Void, Error>?

private var socketDisconnected = false
private var cancellables = DisposableBag()
private var disposableBag = DisposableBag()

init(streamVideo: StreamVideo, callsQuery: CallsQuery) {
self.callsQuery = callsQuery
Expand All @@ -49,9 +48,7 @@ public class CallsController: ObservableObject, @unchecked Sendable {
}

public func cleanUp() {
watchTask?.cancel()
watchTask = nil
cancellables.removeAll()
disposableBag.removeAll()
}

// MARK: - private
Expand All @@ -67,7 +64,7 @@ public class CallsController: ObservableObject, @unchecked Sendable {
self.reWatchCalls()
}
}
.store(in: cancellables)
.store(in: disposableBag)
}

private func loadCalls(shouldRefresh: Bool = false) async throws {
Expand Down Expand Up @@ -190,11 +187,10 @@ public class CallsController: ObservableObject, @unchecked Sendable {
}

private func subscribeToWatchEvents() {
watchTask = Task {
for await event in streamVideo.subscribe() {
handle(event: event)
}
}
streamVideo
.eventPublisher()
.sink { [weak self] in self?.handle(event: $0) }
.store(in: disposableBag)
}

deinit {
Expand Down
134 changes: 83 additions & 51 deletions Sources/StreamVideo/StreamVideo.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
@Injected(\.callCache) private var callCache
@Injected(\.timers) private var timers

private enum DisposableKey: String { case ringEventReceived }

public final class State: ObservableObject, @unchecked Sendable {
@Published public internal(set) var connection: ConnectionStatus
@Published public internal(set) var user: User
Expand Down Expand Up @@ -62,6 +64,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
/// A protocol that provides a method to determine the rejection reason for a call.
public lazy var rejectionReasonProvider: RejectionReasonProviding = StreamRejectionReasonProvider(self)

private let eventSubject: PassthroughSubject<WrappedEvent, Never> = .init()

var token: UserToken

private var tokenProvider: UserTokenProvider
Expand All @@ -78,7 +82,7 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
private let eventsMiddleware = WSEventsMiddleware()
private var cachedLocation: String?
private var connectTask: Task<Void, Error>?
private var eventHandlers = [EventHandler]()
private let disposableBag = DisposableBag()

/// The notification center used to send and receive notifications about incoming events.
private(set) lazy var eventNotificationCenter: EventNotificationCenter = {
Expand Down Expand Up @@ -216,6 +220,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
if autoConnectOnInit {
initialConnectIfRequired(apiKey: apiKey)
}

observeCallRingEvents()
}

deinit {
Expand Down Expand Up @@ -314,9 +320,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {

/// Disconnects the current `StreamVideo` client.
public func disconnect() async {
eventHandlers.forEach { $0.cancel() }
eventHandlers.removeAll()

await withCheckedContinuation { [webSocketClient] continuation in
if let webSocketClient = webSocketClient {
webSocketClient.disconnect {
Expand All @@ -327,35 +330,50 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
}
}
}


/// Publishes all received video events coming from the coordinator.
///
/// Use this method to observe all incoming `VideoEvent`s regardless of
/// specific type. Events are filtered to only include those classified as
/// `coordinatorEvent` cases.
///
/// - Returns: A publisher emitting `VideoEvent` instances.
public func eventPublisher() -> AnyPublisher<VideoEvent, Never> {
eventSubject
.compactMap {
guard case let .coordinatorEvent(event) = $0 else {
return nil
}
return event
}
.eraseToAnyPublisher()
}

/// Publishes specific typed WebSocket events.
///
/// Use this method to subscribe only to a specific type of event emitted by
/// the coordinator. The `WSEvent` must conform to `Event`.
///
/// - Parameter event: The type of WebSocket event to observe.
/// - Returns: A publisher emitting events of the specified `WSEvent` type.
public func eventPublisher<WSEvent: Event>(
for event: WSEvent.Type
) -> AnyPublisher<WSEvent, Never> {
eventSubject
.compactMap { $0.unwrap()?.rawValue as? WSEvent }
.eraseToAnyPublisher()
}

/// Subscribes to all video events.
/// - Returns: `AsyncStream` of `VideoEvent`s.
public func subscribe() -> AsyncStream<VideoEvent> {
AsyncStream(VideoEvent.self) { [weak self] continuation in
let eventHandler = EventHandler(handler: { event in
guard case let .coordinatorEvent(event) = event else {
return
}
continuation.yield(event)
}, cancel: { continuation.finish() })
self?.eventHandlers.append(eventHandler)
}
eventPublisher().eraseAsAsyncStream()
}

/// Subscribes to a particular WS event.
/// - Returns: `AsyncStream` of the requested WS event.
public func subscribe<WSEvent: Event>(for event: WSEvent.Type) -> AsyncStream<WSEvent> {
AsyncStream(event) { [weak self] continuation in
let eventHandler = EventHandler(handler: { event in
guard let coordinatorEvent = event.unwrap() else {
return
}
if let event = coordinatorEvent.unwrap() as? WSEvent {
continuation.yield(event)
}
}, cancel: { continuation.finish() })
self?.eventHandlers.append(eventHandler)
}
eventPublisher(for: event).eraseAsAsyncStream()
}

public func queryCalls(
Expand Down Expand Up @@ -489,17 +507,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
} else {
throw ClientError.Unknown()
}
var connected = false
var timeout = false
let control = DefaultTimer.schedule(timeInterval: 30, queue: .sdk) {
timeout = true
}
log.debug("Listening for WS connection")
webSocketClient?.onConnected = {
control.cancel()
connected = true
log.debug("WS connected")
}

do {
log.debug("Listening for WS connection")
Expand Down Expand Up @@ -560,12 +567,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
|| webSocketClient?.connectionState == .authenticating else {
return ""
}

var timeout = false
let control = DefaultTimer.schedule(timeInterval: 5, queue: .sdk) {
timeout = true
}
log.debug("Waiting for connection id")

do {
return try await timers
Expand Down Expand Up @@ -744,24 +745,55 @@ extension StreamVideo: ConnectionStateDelegate {
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
}
}
eventHandlers.forEach { $0.handler(.internalEvent(WSDisconnected())) }
eventSubject.send(.internalEvent(WSDisconnected()))
case .connected(healthCheckInfo: _):
eventHandlers.forEach { $0.handler(.internalEvent(WSConnected())) }
eventSubject.send(.internalEvent(WSConnected()))
default:
log.debug("Web socket connection state update \(state)")
}
}

/// Observes incoming call ring events from the coordinator.
///
/// This method subscribes to `typeCallRingEvent` messages from the internal
/// event stream. When such an event is received, it attempts to retrieve or
/// create a `Call` object matching the event's call ID and type. Once the
/// call is found, it updates the call's state with the event data and sets it
/// as the current `ringingCall`.
///
/// The resulting subscription is stored in `disposableBag` under a specific
/// key to allow later cancellation or cleanup.
private func observeCallRingEvents() {
eventSubject
.eraseToAnyPublisher()
.compactMap { (source: WrappedEvent) -> CallRingEvent? in
guard
case let .typeCallRingEvent(event) = source.unwrap()
else {
return nil
}
return event
}
.compactMap { [weak self] (source: CallRingEvent) -> (event: CallRingEvent, call: Call)? in
guard let call = self?.call(callType: source.call.type, callId: source.call.id) else {
return nil
}
return (event: source, call: call)
}
.sinkTask(storeIn: disposableBag) { @MainActor [weak self] in
guard let self else { return }
$0.call.state.update(from: $0.event)
self.state.ringingCall = $0.call
}
.store(in: disposableBag, key: DisposableKey.ringEventReceived.rawValue)
}
}

extension StreamVideo: WSEventsSubscriber {

func onEvent(_ event: WrappedEvent) {
for eventHandler in eventHandlers {
eventHandler.handler(event)
}
Task { @MainActor [weak self] in
self?.checkRingEvent(event)
}
func onEvent(_ event: WrappedEvent) async {
eventSubject.send(event)
checkRingEvent(event)
}

private func checkRingEvent(_ event: WrappedEvent) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ final class ClosedCaptionsAdapter {
- Parameter call: The call instance to subscribe for closed caption events.
*/
private func configure(with call: Call) {
cancellable = AsyncStreamPublisher(call.subscribe(for: ClosedCaptionEvent.self))
cancellable = call
.eventPublisher(for: ClosedCaptionEvent.self)
.map(\.closedCaption)
.removeDuplicates()
.log(.debug) { "Processing closedCaption for speakerId:\($0.speakerId) text:\($0.text)." }
Expand Down
Loading