Skip to content

Commit a5df0b6

Browse files
committed
[Performance]Break async stream memory leaks (#826)
* [Enhancement]AsyncStreams are now backed by Publishers * Fix failing tests * Remove subscriptions and make tests pass
1 parent 2ed5c31 commit a5df0b6

File tree

12 files changed

+311
-139
lines changed

12 files changed

+311
-139
lines changed

DemoApp/Sources/Components/Reactions/ReactionsAdapter.swift

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable {
1616

1717
var player: AVAudioPlayer?
1818

19-
private var reactionsTask: Task<Void, Error>?
2019
private var callEndedNotificationObserver: Any?
2120
private var activeCallUpdated: AnyCancellable?
21+
private let disposableBag = DisposableBag()
2222
private var call: Call? { didSet { subscribeToReactionEvents() } }
2323

2424
@Published var showFireworks = false
@@ -90,24 +90,22 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable {
9090

9191
private func subscribeToReactionEvents() {
9292
guard let call else {
93-
reactionsTask?.cancel()
93+
disposableBag.removeAll()
9494
return
9595
}
9696

97-
let callReactionEventsStream = call.subscribe(for: CallReactionEvent.self)
98-
99-
reactionsTask = Task { [weak self] in
100-
for await event in callReactionEventsStream {
97+
call
98+
.eventPublisher(for: CallReactionEvent.self)
99+
.sinkTask(storeIn: disposableBag) { [weak self] event in
101100
guard
102101
let reaction = self?.reaction(for: event)
103102
else {
104-
continue
103+
return
105104
}
106105
self?.handleReaction(reaction, from: event.reaction.user.toUser)
107106
log.debug("\(event.reaction.user.name ?? event.reaction.user.id) reacted with reaction:\(reaction.id)")
108107
}
109-
return
110-
}
108+
.store(in: disposableBag)
111109
}
112110

113111
private func reaction(for event: CallReactionEvent) -> Reaction? {
@@ -206,12 +204,7 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable {
206204
}
207205

208206
private func handleCallEnded() {
209-
Task {
210-
await MainActor.run { [weak self] in
211-
self?.reactionsTask?.cancel()
212-
self?.reactionsTask = nil
213-
}
214-
}
207+
disposableBag.removeAll()
215208
}
216209
}
217210

Sources/StreamVideo/Call.swift

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,17 +1403,17 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
14031403
}
14041404
}
14051405

1406-
internal func onEvent(_ event: WrappedEvent) {
1406+
internal func onEvent(_ event: WrappedEvent) async {
14071407
guard case let .coordinatorEvent(videoEvent) = event else {
14081408
return
14091409
}
14101410
guard videoEvent.forCall(cid: cId) else {
14111411
return
14121412
}
1413-
executeOnMain { [weak self] in
1413+
await Task { @MainActor [weak self] in
14141414
guard let self else { return }
14151415
self.state.updateState(from: videoEvent)
1416-
}
1416+
}.value
14171417

14181418
eventSubject.send(event)
14191419
}

Sources/StreamVideo/CallKit/CallKitService.swift

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
1414
@Injected(\.callCache) private var callCache
1515
@Injected(\.uuidFactory) private var uuidFactory
1616
@Injected(\.timers) private var timers
17+
private let disposableBag = DisposableBag()
1718

1819
/// Represents a call that is being managed by the service.
1920
final class CallEntry: Equatable, @unchecked Sendable {
@@ -89,7 +90,6 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
8990
private var active: UUID?
9091
var callCount: Int { storageAccessQueue.sync { _storage.count } }
9192

92-
private var callEventsSubscription: Task<Void, Error>?
9393
private var callEndedNotificationCancellable: AnyCancellable?
9494
private var ringingTimerCancellable: AnyCancellable?
9595

@@ -567,8 +567,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
567567
/// Subscribing to events is being used to reject/stop calls that have been accepted/rejected
568568
/// on other devices or components (e.g. incoming callScreen, CallKitService)
569569
private func subscribeToCallEvents() {
570-
callEventsSubscription?.cancel()
571-
callEventsSubscription = nil
570+
disposableBag.removeAll()
572571

573572
guard let streamVideo else {
574573
log.warning(
@@ -581,8 +580,10 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
581580
return
582581
}
583582

584-
callEventsSubscription = Task {
585-
for await event in streamVideo.subscribe() {
583+
streamVideo
584+
.eventPublisher()
585+
.sink { [weak self] event in
586+
guard let self else { return }
586587
switch event {
587588
case let .typeCallEndedEvent(response):
588589
callEnded(response.callCid, ringingTimedOut: false)
@@ -596,7 +597,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable {
596597
break
597598
}
598599
}
599-
}
600+
.store(in: disposableBag)
600601

601602
log.debug(
602603
"\(type(of: self)) is now subscribed to CallEvent updates.",

Sources/StreamVideo/Controllers/CallsController.swift

Lines changed: 8 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -31,10 +31,9 @@ public class CallsController: ObservableObject, @unchecked Sendable {
3131

3232
private let callsQuery: CallsQuery
3333
private let streamVideo: StreamVideo
34-
35-
private var watchTask: Task<Void, Error>?
34+
3635
private var socketDisconnected = false
37-
private var cancellables = DisposableBag()
36+
private var disposableBag = DisposableBag()
3837

3938
init(streamVideo: StreamVideo, callsQuery: CallsQuery) {
4039
self.callsQuery = callsQuery
@@ -49,9 +48,7 @@ public class CallsController: ObservableObject, @unchecked Sendable {
4948
}
5049

5150
public func cleanUp() {
52-
watchTask?.cancel()
53-
watchTask = nil
54-
cancellables.removeAll()
51+
disposableBag.removeAll()
5552
}
5653

5754
// MARK: - private
@@ -67,7 +64,7 @@ public class CallsController: ObservableObject, @unchecked Sendable {
6764
self.reWatchCalls()
6865
}
6966
}
70-
.store(in: cancellables)
67+
.store(in: disposableBag)
7168
}
7269

7370
private func loadCalls(shouldRefresh: Bool = false) async throws {
@@ -190,11 +187,10 @@ public class CallsController: ObservableObject, @unchecked Sendable {
190187
}
191188

192189
private func subscribeToWatchEvents() {
193-
watchTask = Task {
194-
for await event in streamVideo.subscribe() {
195-
handle(event: event)
196-
}
197-
}
190+
streamVideo
191+
.eventPublisher()
192+
.sink { [weak self] in self?.handle(event: $0) }
193+
.store(in: disposableBag)
198194
}
199195

200196
deinit {

Sources/StreamVideo/StreamVideo.swift

Lines changed: 83 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
1717
@Injected(\.callCache) private var callCache
1818
@Injected(\.timers) private var timers
1919

20+
private enum DisposableKey: String { case ringEventReceived }
21+
2022
public final class State: ObservableObject, @unchecked Sendable {
2123
@Published public internal(set) var connection: ConnectionStatus
2224
@Published public internal(set) var user: User
@@ -62,6 +64,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
6264
/// A protocol that provides a method to determine the rejection reason for a call.
6365
public lazy var rejectionReasonProvider: RejectionReasonProviding = StreamRejectionReasonProvider(self)
6466

67+
private let eventSubject: PassthroughSubject<WrappedEvent, Never> = .init()
68+
6569
var token: UserToken
6670

6771
private var tokenProvider: UserTokenProvider
@@ -78,7 +82,7 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
7882
private let eventsMiddleware = WSEventsMiddleware()
7983
private var cachedLocation: String?
8084
private var connectTask: Task<Void, Error>?
81-
private var eventHandlers = [EventHandler]()
85+
private let disposableBag = DisposableBag()
8286

8387
/// The notification center used to send and receive notifications about incoming events.
8488
private(set) lazy var eventNotificationCenter: EventNotificationCenter = {
@@ -216,6 +220,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
216220
if autoConnectOnInit {
217221
initialConnectIfRequired(apiKey: apiKey)
218222
}
223+
224+
observeCallRingEvents()
219225
}
220226

221227
deinit {
@@ -314,9 +320,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
314320

315321
/// Disconnects the current `StreamVideo` client.
316322
public func disconnect() async {
317-
eventHandlers.forEach { $0.cancel() }
318-
eventHandlers.removeAll()
319-
320323
await withCheckedContinuation { [webSocketClient] continuation in
321324
if let webSocketClient = webSocketClient {
322325
webSocketClient.disconnect {
@@ -327,35 +330,50 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
327330
}
328331
}
329332
}
330-
333+
334+
/// Publishes all received video events coming from the coordinator.
335+
///
336+
/// Use this method to observe all incoming `VideoEvent`s regardless of
337+
/// specific type. Events are filtered to only include those classified as
338+
/// `coordinatorEvent` cases.
339+
///
340+
/// - Returns: A publisher emitting `VideoEvent` instances.
341+
public func eventPublisher() -> AnyPublisher<VideoEvent, Never> {
342+
eventSubject
343+
.compactMap {
344+
guard case let .coordinatorEvent(event) = $0 else {
345+
return nil
346+
}
347+
return event
348+
}
349+
.eraseToAnyPublisher()
350+
}
351+
352+
/// Publishes specific typed WebSocket events.
353+
///
354+
/// Use this method to subscribe only to a specific type of event emitted by
355+
/// the coordinator. The `WSEvent` must conform to `Event`.
356+
///
357+
/// - Parameter event: The type of WebSocket event to observe.
358+
/// - Returns: A publisher emitting events of the specified `WSEvent` type.
359+
public func eventPublisher<WSEvent: Event>(
360+
for event: WSEvent.Type
361+
) -> AnyPublisher<WSEvent, Never> {
362+
eventSubject
363+
.compactMap { $0.unwrap()?.rawValue as? WSEvent }
364+
.eraseToAnyPublisher()
365+
}
366+
331367
/// Subscribes to all video events.
332368
/// - Returns: `AsyncStream` of `VideoEvent`s.
333369
public func subscribe() -> AsyncStream<VideoEvent> {
334-
AsyncStream(VideoEvent.self) { [weak self] continuation in
335-
let eventHandler = EventHandler(handler: { event in
336-
guard case let .coordinatorEvent(event) = event else {
337-
return
338-
}
339-
continuation.yield(event)
340-
}, cancel: { continuation.finish() })
341-
self?.eventHandlers.append(eventHandler)
342-
}
370+
eventPublisher().eraseAsAsyncStream()
343371
}
344372

345373
/// Subscribes to a particular WS event.
346374
/// - Returns: `AsyncStream` of the requested WS event.
347375
public func subscribe<WSEvent: Event>(for event: WSEvent.Type) -> AsyncStream<WSEvent> {
348-
AsyncStream(event) { [weak self] continuation in
349-
let eventHandler = EventHandler(handler: { event in
350-
guard let coordinatorEvent = event.unwrap() else {
351-
return
352-
}
353-
if let event = coordinatorEvent.unwrap() as? WSEvent {
354-
continuation.yield(event)
355-
}
356-
}, cancel: { continuation.finish() })
357-
self?.eventHandlers.append(eventHandler)
358-
}
376+
eventPublisher(for: event).eraseAsAsyncStream()
359377
}
360378

361379
public func queryCalls(
@@ -489,17 +507,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
489507
} else {
490508
throw ClientError.Unknown()
491509
}
492-
var connected = false
493-
var timeout = false
494-
let control = DefaultTimer.schedule(timeInterval: 30, queue: .sdk) {
495-
timeout = true
496-
}
497-
log.debug("Listening for WS connection")
498-
webSocketClient?.onConnected = {
499-
control.cancel()
500-
connected = true
501-
log.debug("WS connected")
502-
}
503510

504511
do {
505512
log.debug("Listening for WS connection")
@@ -560,12 +567,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
560567
|| webSocketClient?.connectionState == .authenticating else {
561568
return ""
562569
}
563-
564-
var timeout = false
565-
let control = DefaultTimer.schedule(timeInterval: 5, queue: .sdk) {
566-
timeout = true
567-
}
568-
log.debug("Waiting for connection id")
569570

570571
do {
571572
return try await timers
@@ -744,24 +745,55 @@ extension StreamVideo: ConnectionStateDelegate {
744745
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
745746
}
746747
}
747-
eventHandlers.forEach { $0.handler(.internalEvent(WSDisconnected())) }
748+
eventSubject.send(.internalEvent(WSDisconnected()))
748749
case .connected(healthCheckInfo: _):
749-
eventHandlers.forEach { $0.handler(.internalEvent(WSConnected())) }
750+
eventSubject.send(.internalEvent(WSConnected()))
750751
default:
751752
log.debug("Web socket connection state update \(state)")
752753
}
753754
}
755+
756+
/// Observes incoming call ring events from the coordinator.
757+
///
758+
/// This method subscribes to `typeCallRingEvent` messages from the internal
759+
/// event stream. When such an event is received, it attempts to retrieve or
760+
/// create a `Call` object matching the event's call ID and type. Once the
761+
/// call is found, it updates the call's state with the event data and sets it
762+
/// as the current `ringingCall`.
763+
///
764+
/// The resulting subscription is stored in `disposableBag` under a specific
765+
/// key to allow later cancellation or cleanup.
766+
private func observeCallRingEvents() {
767+
eventSubject
768+
.eraseToAnyPublisher()
769+
.compactMap { (source: WrappedEvent) -> CallRingEvent? in
770+
guard
771+
case let .typeCallRingEvent(event) = source.unwrap()
772+
else {
773+
return nil
774+
}
775+
return event
776+
}
777+
.compactMap { [weak self] (source: CallRingEvent) -> (event: CallRingEvent, call: Call)? in
778+
guard let call = self?.call(callType: source.call.type, callId: source.call.id) else {
779+
return nil
780+
}
781+
return (event: source, call: call)
782+
}
783+
.sinkTask(storeIn: disposableBag) { @MainActor [weak self] in
784+
guard let self else { return }
785+
$0.call.state.update(from: $0.event)
786+
self.state.ringingCall = $0.call
787+
}
788+
.store(in: disposableBag, key: DisposableKey.ringEventReceived.rawValue)
789+
}
754790
}
755791

756792
extension StreamVideo: WSEventsSubscriber {
757793

758-
func onEvent(_ event: WrappedEvent) {
759-
for eventHandler in eventHandlers {
760-
eventHandler.handler(event)
761-
}
762-
Task { @MainActor [weak self] in
763-
self?.checkRingEvent(event)
764-
}
794+
func onEvent(_ event: WrappedEvent) async {
795+
eventSubject.send(event)
796+
checkRingEvent(event)
765797
}
766798

767799
private func checkRingEvent(_ event: WrappedEvent) {

Sources/StreamVideo/Utils/ClosedCaptionsAdapter/ClosedCaptionsAdapter.swift

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ final class ClosedCaptionsAdapter {
7474
- Parameter call: The call instance to subscribe for closed caption events.
7575
*/
7676
private func configure(with call: Call) {
77-
cancellable = AsyncStreamPublisher(call.subscribe(for: ClosedCaptionEvent.self))
77+
cancellable = call
78+
.eventPublisher(for: ClosedCaptionEvent.self)
7879
.map(\.closedCaption)
7980
.removeDuplicates()
8081
.log(.debug) { "Processing closedCaption for speakerId:\($0.speakerId) text:\($0.text)." }

0 commit comments

Comments
 (0)