From 917af32e7a2c037124b22b7dc36717ae07f8e806 Mon Sep 17 00:00:00 2001 From: Ilias Pavlidakis Date: Tue, 27 May 2025 21:21:01 +0300 Subject: [PATCH 1/3] [Enhancement]AsyncStreams are now backed by Publishers --- Sources/StreamVideo/StreamVideo.swift | 132 ++++++++++++------- StreamVideoTests/StreamVideo_Tests.swift | 161 ++++++++++++++++++++--- 2 files changed, 228 insertions(+), 65 deletions(-) diff --git a/Sources/StreamVideo/StreamVideo.swift b/Sources/StreamVideo/StreamVideo.swift index d770557dd..837138f59 100644 --- a/Sources/StreamVideo/StreamVideo.swift +++ b/Sources/StreamVideo/StreamVideo.swift @@ -17,6 +17,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { @Injected(\.callCache) private var callCache @Injected(\.timers) private var timers + private enum DisposableKey: String { case ringEventReceived } + public final class State: ObservableObject, @unchecked Sendable { @Published public internal(set) var connection: ConnectionStatus @Published public internal(set) var user: User @@ -62,6 +64,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { /// A protocol that provides a method to determine the rejection reason for a call. public lazy var rejectionReasonProvider: RejectionReasonProviding = StreamRejectionReasonProvider(self) + private let eventSubject: PassthroughSubject = .init() + var token: UserToken private var tokenProvider: UserTokenProvider @@ -78,7 +82,7 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { private let eventsMiddleware = WSEventsMiddleware() private var cachedLocation: String? private var connectTask: Task? - private var eventHandlers = [EventHandler]() + private let disposableBag = DisposableBag() /// The notification center used to send and receive notifications about incoming events. private(set) lazy var eventNotificationCenter: EventNotificationCenter = { @@ -216,6 +220,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { if autoConnectOnInit { initialConnectIfRequired(apiKey: apiKey) } + + observeCallRingEvents() } deinit { @@ -314,9 +320,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { /// Disconnects the current `StreamVideo` client. public func disconnect() async { - eventHandlers.forEach { $0.cancel() } - eventHandlers.removeAll() - await withCheckedContinuation { [webSocketClient] continuation in if let webSocketClient = webSocketClient { webSocketClient.disconnect { @@ -327,35 +330,50 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { } } } - + + /// Publishes all received video events coming from the coordinator. + /// + /// Use this method to observe all incoming `VideoEvent`s regardless of + /// specific type. Events are filtered to only include those classified as + /// `coordinatorEvent` cases. + /// + /// - Returns: A publisher emitting `VideoEvent` instances. + public func eventPublisher() -> AnyPublisher { + eventSubject + .compactMap { + guard case let .coordinatorEvent(event) = $0 else { + return nil + } + return event + } + .eraseToAnyPublisher() + } + + /// Publishes specific typed WebSocket events. + /// + /// Use this method to subscribe only to a specific type of event emitted by + /// the coordinator. The `WSEvent` must conform to `Event`. + /// + /// - Parameter event: The type of WebSocket event to observe. + /// - Returns: A publisher emitting events of the specified `WSEvent` type. + public func eventPublisher( + for event: WSEvent.Type + ) -> AnyPublisher { + eventSubject + .compactMap { $0.unwrap()?.rawValue as? WSEvent } + .eraseToAnyPublisher() + } + /// Subscribes to all video events. /// - Returns: `AsyncStream` of `VideoEvent`s. public func subscribe() -> AsyncStream { - AsyncStream(VideoEvent.self) { [weak self] continuation in - let eventHandler = EventHandler(handler: { event in - guard case let .coordinatorEvent(event) = event else { - return - } - continuation.yield(event) - }, cancel: { continuation.finish() }) - self?.eventHandlers.append(eventHandler) - } + eventPublisher().eraseAsAsyncStream() } /// Subscribes to a particular WS event. /// - Returns: `AsyncStream` of the requested WS event. public func subscribe(for event: WSEvent.Type) -> AsyncStream { - AsyncStream(event) { [weak self] continuation in - let eventHandler = EventHandler(handler: { event in - guard let coordinatorEvent = event.unwrap() else { - return - } - if let event = coordinatorEvent.unwrap() as? WSEvent { - continuation.yield(event) - } - }, cancel: { continuation.finish() }) - self?.eventHandlers.append(eventHandler) - } + eventPublisher(for: event).eraseAsAsyncStream() } public func queryCalls( @@ -489,17 +507,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { } else { throw ClientError.Unknown() } - var connected = false - var timeout = false - let control = DefaultTimer.schedule(timeInterval: 30, queue: .sdk) { - timeout = true - } - log.debug("Listening for WS connection") - webSocketClient?.onConnected = { - control.cancel() - connected = true - log.debug("WS connected") - } do { log.debug("Listening for WS connection") @@ -560,12 +567,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable { || webSocketClient?.connectionState == .authenticating else { return "" } - - var timeout = false - let control = DefaultTimer.schedule(timeInterval: 5, queue: .sdk) { - timeout = true - } - log.debug("Waiting for connection id") do { return try await timers @@ -744,24 +745,55 @@ extension StreamVideo: ConnectionStateDelegate { connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state) } } - eventHandlers.forEach { $0.handler(.internalEvent(WSDisconnected())) } + eventSubject.send(.internalEvent(WSDisconnected())) case .connected(healthCheckInfo: _): - eventHandlers.forEach { $0.handler(.internalEvent(WSConnected())) } + eventSubject.send(.internalEvent(WSConnected())) default: log.debug("Web socket connection state update \(state)") } } + + /// Observes incoming call ring events from the coordinator. + /// + /// This method subscribes to `typeCallRingEvent` messages from the internal + /// event stream. When such an event is received, it attempts to retrieve or + /// create a `Call` object matching the event's call ID and type. Once the + /// call is found, it updates the call's state with the event data and sets it + /// as the current `ringingCall`. + /// + /// The resulting subscription is stored in `disposableBag` under a specific + /// key to allow later cancellation or cleanup. + private func observeCallRingEvents() { + eventSubject + .eraseToAnyPublisher() + .compactMap { (source: WrappedEvent) -> CallRingEvent? in + guard + case let .typeCallRingEvent(event) = source.unwrap() + else { + return nil + } + return event + } + .compactMap { [weak self] (source: CallRingEvent) -> (event: CallRingEvent, call: Call)? in + guard let call = self?.call(callType: source.call.type, callId: source.call.id) else { + return nil + } + return (event: source, call: call) + } + .sinkTask(storeIn: disposableBag) { @MainActor [weak self] in + guard let self else { return } + $0.call.state.update(from: $0.event) + self.state.ringingCall = $0.call + } + .store(in: disposableBag, key: DisposableKey.ringEventReceived.rawValue) + } } extension StreamVideo: WSEventsSubscriber { func onEvent(_ event: WrappedEvent) { - for eventHandler in eventHandlers { - eventHandler.handler(event) - } - Task { @MainActor [weak self] in - self?.checkRingEvent(event) - } + eventSubject.send(event) + checkRingEvent(event) } private func checkRingEvent(_ event: WrappedEvent) { diff --git a/StreamVideoTests/StreamVideo_Tests.swift b/StreamVideoTests/StreamVideo_Tests.swift index 1d882571e..e15de646c 100644 --- a/StreamVideoTests/StreamVideo_Tests.swift +++ b/StreamVideoTests/StreamVideo_Tests.swift @@ -36,7 +36,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { XCTAssert(error is ClientError.MissingPermissions) } } - + func test_streamVideo_makeCall() { // Given let streamVideo = StreamVideo( @@ -70,7 +70,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { // Then XCTAssert(streamVideo.state.activeCall?.cId == call.cId) - + // When call.leave() @@ -79,7 +79,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { // Then XCTAssert(streamVideo.state.activeCall == nil) } - + func test_streamVideo_ringCallAccept() async throws { let httpClient = httpClientWithGetCallResponse() let streamVideo = StreamVideo.mock(httpClient: httpClient) @@ -96,7 +96,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { // Then XCTAssert(streamVideo.state.activeCall == nil) XCTAssert(streamVideo.state.ringingCall?.cId == call.cId) - + // When let callAcceptedEvent = CallAcceptedEvent( call: makeCallResponse(), @@ -116,7 +116,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { XCTAssert(streamVideo.state.ringingCall == nil) XCTAssert(streamVideo.state.activeCall?.cId == call.cId) } - + func test_streamVideo_ringCallReject() async throws { let httpClient = httpClientWithGetCallResponse() let rejectCallResponse = RejectCallResponse(duration: "1") @@ -136,7 +136,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { // Then XCTAssert(streamVideo.state.activeCall == nil) XCTAssert(streamVideo.state.ringingCall?.cId == call.cId) - + // When try await call.reject() await fulfillment { @@ -144,7 +144,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { && streamVideo.state.ringingCall == nil } } - + func test_streamVideo_incomingCallAccept() async throws { // Given let streamVideo = StreamVideo.mock(httpClient: HTTPClient_Mock()) @@ -164,7 +164,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { let incomingCall = WrappedEvent.coordinatorEvent(.typeCallRingEvent(ringEvent)) streamVideo.eventNotificationCenter.process(incomingCall) try await waitForCallEvent() - + // Then XCTAssert(streamVideo.state.activeCall == nil) XCTAssert(streamVideo.state.ringingCall?.cId == call.cId) @@ -180,7 +180,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { XCTAssert(streamVideo.state.ringingCall == nil) XCTAssert(streamVideo.state.activeCall?.cId == call.cId) } - + func test_streamVideo_incomingCallReject() async throws { // Given let httpClient = HTTPClient_Mock() @@ -223,7 +223,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { XCTAssertNil(streamVideo.state.ringingCall) XCTAssertNil(streamVideo.state.activeCall) } - + func test_streamVideo_initialState() { // Given let streamVideo = StreamVideo( @@ -233,19 +233,150 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { videoConfig: .dummy(), tokenProvider: { _ in } ) - + // Then XCTAssert(streamVideo.state.user == StreamVideo.mockUser) XCTAssert(streamVideo.state.connection == .initialized) } - + + // MARK: - Event Publisher & Subscribe Tests + + func test_eventPublisher_filtersOnlyCoordinatorEvents() async { + // Given + let streamVideo = StreamVideo( + apiKey: "key1", + user: .anonymous, + token: StreamVideo.mockToken, + videoConfig: .dummy(), + tokenProvider: { _ in } + ) + self.streamVideo = streamVideo + nonisolated(unsafe) var receivedEvents: [VideoEvent] = [] + + let cancellable = streamVideo.eventPublisher().sink { + receivedEvents.append($0) + } + + // When + let coordinatorEvent = VideoEvent.typeCallAcceptedEvent(.dummy(callCid: "cid")) + let internalEvent = WrappedEvent.internalEvent(WSConnected()) + streamVideo.eventNotificationCenter.process(.coordinatorEvent(coordinatorEvent)) + streamVideo.eventNotificationCenter.process(internalEvent) + await fulfillment { receivedEvents.count == 1 } + + // Then + XCTAssertEqual(receivedEvents.count, 1) + XCTAssertEqual(receivedEvents.first, coordinatorEvent) + + cancellable.cancel() + } + + func test_eventPublisherFor_specificEventType() async { + // Given + let streamVideo = StreamVideo( + apiKey: "key1", + user: .anonymous, + token: StreamVideo.mockToken, + videoConfig: .dummy(), + tokenProvider: { _ in } + ) + self.streamVideo = streamVideo + nonisolated(unsafe) var receivedEvents: [CallAcceptedEvent] = [] + + let cancellable = streamVideo.eventPublisher(for: CallAcceptedEvent.self).sink { + receivedEvents.append($0) + } + + // When + let event = CallAcceptedEvent.dummy() + streamVideo.eventNotificationCenter.process( + .coordinatorEvent(.typeCallAcceptedEvent(event)) + ) + + await fulfillment { receivedEvents.count == 1 } + + // Then + XCTAssertEqual(receivedEvents.count, 1) + XCTAssertEqual(receivedEvents.first, event) + + cancellable.cancel() + } + +// + func test_subscribe_returnsAsyncStreamOfVideoEvents() async throws { + // Given + let streamVideo = StreamVideo( + apiKey: "key1", + user: .anonymous, + token: StreamVideo.mockToken, + videoConfig: .dummy(), + tokenProvider: { _ in } + ) + self.streamVideo = streamVideo + let event = VideoEvent.typeCallAcceptedEvent(.dummy()) + let expectation = expectation(description: "Received async event") + + // When + Task { + await withThrowingTaskGroup { group in + group.addTask { + for await value in streamVideo.subscribe() { + XCTAssertEqual(value, event) + expectation.fulfill() + } + } + group.addTask { + await self.wait(for: 0.5) + streamVideo.eventNotificationCenter.process(.coordinatorEvent(event)) + } + } + } + + // Then + await fulfillment(of: [expectation], timeout: defaultTimeout) + } + + func test_subscribeFor_specificEventType_returnsAsyncStream() async throws { + // Given + let streamVideo = StreamVideo( + apiKey: "key1", + user: .anonymous, + token: StreamVideo.mockToken, + videoConfig: .dummy(), + tokenProvider: { _ in } + ) + self.streamVideo = streamVideo + let content = CallAcceptedEvent.dummy() + let event = VideoEvent.typeCallAcceptedEvent(content) + let expectation = expectation(description: "Received async event") + + // When + Task { + await withThrowingTaskGroup { group in + group.addTask { + for await value in streamVideo.subscribe(for: CallAcceptedEvent.self) { + XCTAssertEqual(value, content) + expectation.fulfill() + } + } + group.addTask { + await self.wait(for: 0.5) + streamVideo.eventNotificationCenter.process(.coordinatorEvent(event)) + } + } + } + + // Then + await fulfillment(of: [expectation], timeout: defaultTimeout) + } + // MARK: - private - + private func makeCallResponse() -> CallResponse { let callResponse = MockResponseBuilder().makeCallResponse(cid: cId) return callResponse } - + private func makeUserResponse() -> UserResponse { UserResponse( blockedUserIds: [], @@ -258,7 +389,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { updatedAt: Date() ) } - + private func httpClientWithGetCallResponse() -> HTTPClient_Mock { let httpClient = HTTPClient_Mock() let callResponse = makeCallResponse() From eaafa046f5c7b8fa77af7fd00238a4587860e2a1 Mon Sep 17 00:00:00 2001 From: Ilias Pavlidakis Date: Tue, 27 May 2025 22:06:35 +0300 Subject: [PATCH 2/3] Fix failing tests --- StreamVideoTests/StreamVideo_Tests.swift | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/StreamVideoTests/StreamVideo_Tests.swift b/StreamVideoTests/StreamVideo_Tests.swift index e15de646c..a5d3cf34a 100644 --- a/StreamVideoTests/StreamVideo_Tests.swift +++ b/StreamVideoTests/StreamVideo_Tests.swift @@ -318,7 +318,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { // When Task { - await withThrowingTaskGroup { group in + await withThrowingTaskGroup(of: Void.self) { group in group.addTask { for await value in streamVideo.subscribe() { XCTAssertEqual(value, event) @@ -352,7 +352,7 @@ final class StreamVideo_Tests: StreamVideoTestCase, @unchecked Sendable { // When Task { - await withThrowingTaskGroup { group in + await withThrowingTaskGroup(of: Void.self) { group in group.addTask { for await value in streamVideo.subscribe(for: CallAcceptedEvent.self) { XCTAssertEqual(value, content) From 646f982d7624bd7aa2a8f5e32d901e2c5e27d3a7 Mon Sep 17 00:00:00 2001 From: Ilias Pavlidakis Date: Wed, 28 May 2025 01:24:42 +0300 Subject: [PATCH 3/3] Remove subscriptions and make tests pass --- .../Reactions/ReactionsAdapter.swift | 23 ++++-------- Sources/StreamVideo/Call.swift | 6 +-- .../StreamVideo/CallKit/CallKitService.swift | 13 ++++--- .../Controllers/CallsController.swift | 20 ++++------ Sources/StreamVideo/StreamVideo.swift | 2 +- .../ClosedCaptionsAdapter.swift | 3 +- .../Events/WSEventsMiddleware.swift | 25 +++++++------ .../StreamVideoSwiftUI/CallViewModel.swift | 37 +++++++++++++------ .../CallingViews/LobbyViewModel_Tests.swift | 16 ++++---- StreamVideoTests/Call/Call_Tests.swift | 2 +- .../Controllers/CallController_Tests.swift | 10 ++--- 11 files changed, 83 insertions(+), 74 deletions(-) diff --git a/DemoApp/Sources/Components/Reactions/ReactionsAdapter.swift b/DemoApp/Sources/Components/Reactions/ReactionsAdapter.swift index 571572136..dfb0d20a0 100644 --- a/DemoApp/Sources/Components/Reactions/ReactionsAdapter.swift +++ b/DemoApp/Sources/Components/Reactions/ReactionsAdapter.swift @@ -16,9 +16,9 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable { var player: AVAudioPlayer? - private var reactionsTask: Task? private var callEndedNotificationObserver: Any? private var activeCallUpdated: AnyCancellable? + private let disposableBag = DisposableBag() private var call: Call? { didSet { subscribeToReactionEvents() } } @Published var showFireworks = false @@ -90,24 +90,22 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable { private func subscribeToReactionEvents() { guard let call else { - reactionsTask?.cancel() + disposableBag.removeAll() return } - let callReactionEventsStream = call.subscribe(for: CallReactionEvent.self) - - reactionsTask = Task { [weak self] in - for await event in callReactionEventsStream { + call + .eventPublisher(for: CallReactionEvent.self) + .sinkTask(storeIn: disposableBag) { [weak self] event in guard let reaction = self?.reaction(for: event) else { - continue + return } self?.handleReaction(reaction, from: event.reaction.user.toUser) log.debug("\(event.reaction.user.name ?? event.reaction.user.id) reacted with reaction:\(reaction.id)") } - return - } + .store(in: disposableBag) } private func reaction(for event: CallReactionEvent) -> Reaction? { @@ -206,12 +204,7 @@ final class ReactionsAdapter: ObservableObject, @unchecked Sendable { } private func handleCallEnded() { - Task { - await MainActor.run { [weak self] in - self?.reactionsTask?.cancel() - self?.reactionsTask = nil - } - } + disposableBag.removeAll() } } diff --git a/Sources/StreamVideo/Call.swift b/Sources/StreamVideo/Call.swift index e83784066..931e70b40 100644 --- a/Sources/StreamVideo/Call.swift +++ b/Sources/StreamVideo/Call.swift @@ -1403,17 +1403,17 @@ public class Call: @unchecked Sendable, WSEventsSubscriber { } } - internal func onEvent(_ event: WrappedEvent) { + internal func onEvent(_ event: WrappedEvent) async { guard case let .coordinatorEvent(videoEvent) = event else { return } guard videoEvent.forCall(cid: cId) else { return } - executeOnMain { [weak self] in + await Task { @MainActor [weak self] in guard let self else { return } self.state.updateState(from: videoEvent) - } + }.value eventSubject.send(event) } diff --git a/Sources/StreamVideo/CallKit/CallKitService.swift b/Sources/StreamVideo/CallKit/CallKitService.swift index e4500e319..275dd2588 100644 --- a/Sources/StreamVideo/CallKit/CallKitService.swift +++ b/Sources/StreamVideo/CallKit/CallKitService.swift @@ -14,6 +14,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable { @Injected(\.callCache) private var callCache @Injected(\.uuidFactory) private var uuidFactory @Injected(\.timers) private var timers + private let disposableBag = DisposableBag() /// Represents a call that is being managed by the service. final class CallEntry: Equatable, @unchecked Sendable { @@ -89,7 +90,6 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable { private var active: UUID? var callCount: Int { storageAccessQueue.sync { _storage.count } } - private var callEventsSubscription: Task? private var callEndedNotificationCancellable: AnyCancellable? private var ringingTimerCancellable: AnyCancellable? @@ -567,8 +567,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable { /// Subscribing to events is being used to reject/stop calls that have been accepted/rejected /// on other devices or components (e.g. incoming callScreen, CallKitService) private func subscribeToCallEvents() { - callEventsSubscription?.cancel() - callEventsSubscription = nil + disposableBag.removeAll() guard let streamVideo else { log.warning( @@ -581,8 +580,10 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable { return } - callEventsSubscription = Task { - for await event in streamVideo.subscribe() { + streamVideo + .eventPublisher() + .sink { [weak self] event in + guard let self else { return } switch event { case let .typeCallEndedEvent(response): callEnded(response.callCid, ringingTimedOut: false) @@ -596,7 +597,7 @@ open class CallKitService: NSObject, CXProviderDelegate, @unchecked Sendable { break } } - } + .store(in: disposableBag) log.debug( "\(type(of: self)) is now subscribed to CallEvent updates.", diff --git a/Sources/StreamVideo/Controllers/CallsController.swift b/Sources/StreamVideo/Controllers/CallsController.swift index a5a9f7bed..9ac21f173 100644 --- a/Sources/StreamVideo/Controllers/CallsController.swift +++ b/Sources/StreamVideo/Controllers/CallsController.swift @@ -31,10 +31,9 @@ public class CallsController: ObservableObject, @unchecked Sendable { private let callsQuery: CallsQuery private let streamVideo: StreamVideo - - private var watchTask: Task? + private var socketDisconnected = false - private var cancellables = DisposableBag() + private var disposableBag = DisposableBag() init(streamVideo: StreamVideo, callsQuery: CallsQuery) { self.callsQuery = callsQuery @@ -49,9 +48,7 @@ public class CallsController: ObservableObject, @unchecked Sendable { } public func cleanUp() { - watchTask?.cancel() - watchTask = nil - cancellables.removeAll() + disposableBag.removeAll() } // MARK: - private @@ -67,7 +64,7 @@ public class CallsController: ObservableObject, @unchecked Sendable { self.reWatchCalls() } } - .store(in: cancellables) + .store(in: disposableBag) } private func loadCalls(shouldRefresh: Bool = false) async throws { @@ -190,11 +187,10 @@ public class CallsController: ObservableObject, @unchecked Sendable { } private func subscribeToWatchEvents() { - watchTask = Task { - for await event in streamVideo.subscribe() { - handle(event: event) - } - } + streamVideo + .eventPublisher() + .sink { [weak self] in self?.handle(event: $0) } + .store(in: disposableBag) } deinit { diff --git a/Sources/StreamVideo/StreamVideo.swift b/Sources/StreamVideo/StreamVideo.swift index 837138f59..964936476 100644 --- a/Sources/StreamVideo/StreamVideo.swift +++ b/Sources/StreamVideo/StreamVideo.swift @@ -791,7 +791,7 @@ extension StreamVideo: ConnectionStateDelegate { extension StreamVideo: WSEventsSubscriber { - func onEvent(_ event: WrappedEvent) { + func onEvent(_ event: WrappedEvent) async { eventSubject.send(event) checkRingEvent(event) } diff --git a/Sources/StreamVideo/Utils/ClosedCaptionsAdapter/ClosedCaptionsAdapter.swift b/Sources/StreamVideo/Utils/ClosedCaptionsAdapter/ClosedCaptionsAdapter.swift index 2a86017ed..90bed0658 100644 --- a/Sources/StreamVideo/Utils/ClosedCaptionsAdapter/ClosedCaptionsAdapter.swift +++ b/Sources/StreamVideo/Utils/ClosedCaptionsAdapter/ClosedCaptionsAdapter.swift @@ -74,7 +74,8 @@ final class ClosedCaptionsAdapter { - Parameter call: The call instance to subscribe for closed caption events. */ private func configure(with call: Call) { - cancellable = AsyncStreamPublisher(call.subscribe(for: ClosedCaptionEvent.self)) + cancellable = call + .eventPublisher(for: ClosedCaptionEvent.self) .map(\.closedCaption) .removeDuplicates() .log(.debug) { "Processing closedCaption for speakerId:\($0.speakerId) text:\($0.text)." } diff --git a/Sources/StreamVideo/WebSockets/Events/WSEventsMiddleware.swift b/Sources/StreamVideo/WebSockets/Events/WSEventsMiddleware.swift index f91639b55..4b5714e02 100644 --- a/Sources/StreamVideo/WebSockets/Events/WSEventsMiddleware.swift +++ b/Sources/StreamVideo/WebSockets/Events/WSEventsMiddleware.swift @@ -4,21 +4,24 @@ import Foundation -final class WSEventsMiddleware: EventMiddleware { - +final class WSEventsMiddleware: EventMiddleware, @unchecked Sendable { + private var subscribers = NSHashTable.weakObjects() func handle(event: WrappedEvent) -> WrappedEvent? { - var streamVideo: StreamVideo? - for subscriber in subscribers.allObjects { - if let subscriber = subscriber as? StreamVideo { - streamVideo = subscriber - } else { - (subscriber as? WSEventsSubscriber)?.onEvent(event) + let allObjects = subscribers.allObjects + Task { + var streamVideo: StreamVideo? + for subscriber in allObjects { + if let subscriber = subscriber as? StreamVideo { + streamVideo = subscriber + } else { + await(subscriber as? WSEventsSubscriber)?.onEvent(event) + } } + await streamVideo?.onEvent(event) } - streamVideo?.onEvent(event) - + return event } @@ -37,5 +40,5 @@ final class WSEventsMiddleware: EventMiddleware { protocol WSEventsSubscriber: AnyObject { - func onEvent(_ event: WrappedEvent) + func onEvent(_ event: WrappedEvent) async } diff --git a/Sources/StreamVideoSwiftUI/CallViewModel.swift b/Sources/StreamVideoSwiftUI/CallViewModel.swift index 54a705136..977176384 100644 --- a/Sources/StreamVideoSwiftUI/CallViewModel.swift +++ b/Sources/StreamVideoSwiftUI/CallViewModel.swift @@ -102,7 +102,11 @@ open class CallViewModel: ObservableObject { @Published public var moreControlsShown = false /// List of the outgoing call members. - @Published public var outgoingCallMembers = [Member]() + @Published public var outgoingCallMembers = [Member]() { + willSet { + _ = 0 + } + } /// Dictionary of the call participants. @Published public private(set) var callParticipants = [String: CallParticipant]() { @@ -169,9 +173,9 @@ open class CallViewModel: ObservableObject { private var lastLayoutChange = Date() private var enteringCallTask: Task? - private var callEventsSubscriptionTask: Task? private var participantsSortComparators = defaultSortPreset private let callEventsHandler = CallEventsHandler() + private let disposableBag = DisposableBag() /// The variable is `true` if CallSettings have been set on the CallViewModel instance (directly or indirectly). /// The variable will be reset to `false` when `leaveCall` will be invoked. @@ -211,7 +215,7 @@ open class CallViewModel: ObservableObject { /// A simple value, signalling that the viewModel has been subscribed to receive callEvents from /// `StreamVideo`. - var isSubscribedToCallEvents: Bool { callEventsSubscriptionTask != nil } + private(set) var isSubscribedToCallEvents: Bool = false public init( participantsLayout: ParticipantsLayout = .grid, @@ -232,7 +236,7 @@ open class CallViewModel: ObservableObject { deinit { enteringCallTask?.cancel() - callEventsSubscriptionTask?.cancel() + disposableBag.removeAll() } /// Toggles the state of the camera (visible vs non-visible). @@ -757,8 +761,10 @@ open class CallViewModel: ObservableObject { } private func subscribeToCallEvents() { - callEventsSubscriptionTask = Task { - for await event in streamVideo.subscribe() { + streamVideo + .eventPublisher() + .sink { [weak self] event in + guard let self else { return } if let callEvent = callEventsHandler.checkForCallEvents(from: event) { switch callEvent { case let .incoming(incomingCall): @@ -793,12 +799,15 @@ open class CallViewModel: ObservableObject { return } - self.participantEvent = participantEvent - try? await Task.sleep(nanoseconds: 2_000_000_000) - self.participantEvent = nil + Task { @MainActor in + self.participantEvent = participantEvent + try? await Task.sleep(nanoseconds: 2_000_000_000) + self.participantEvent = nil + } } } - } + .store(in: disposableBag) + isSubscribedToCallEvents = true } private func handleAcceptedEvent(_ callEvent: CallEvent) { @@ -857,7 +866,13 @@ open class CallViewModel: ObservableObject { return } let outgoingMembersCount = outgoingCallMembers.filter { $0.id != streamVideo.user.id }.count - let rejections = outgoingCall.state.session?.rejectedBy.count ?? 0 + let rejections = { + if outgoingMembersCount == 1, event.user?.id != streamVideo.user.id { + return 1 + } else { + return outgoingCall.state.session?.rejectedBy.count ?? 0 + } + }() let accepted = outgoingCall.state.session?.acceptedBy.count ?? 0 if accepted == 0, rejections >= outgoingMembersCount { Task { diff --git a/StreamVideoSwiftUITests/CallingViews/LobbyViewModel_Tests.swift b/StreamVideoSwiftUITests/CallingViews/LobbyViewModel_Tests.swift index 16d310580..92582a81e 100644 --- a/StreamVideoSwiftUITests/CallingViews/LobbyViewModel_Tests.swift +++ b/StreamVideoSwiftUITests/CallingViews/LobbyViewModel_Tests.swift @@ -17,9 +17,9 @@ final class LobbyViewModelTests: XCTestCase, @unchecked Sendable { mockStreamVideo = nil try await super.tearDown() } - + // MARK: - Join Events Tests - + func test_subscribeForCallJoinUpdates_addsNewParticipant() async throws { let mockCall = MockCall() mockCall.stub( @@ -36,7 +36,7 @@ final class LobbyViewModelTests: XCTestCase, @unchecked Sendable { // we wait for loadCurrentMembers to complete try await Task.sleep(nanoseconds: 500_000_000) - mockCall.onEvent( + await mockCall.onEvent( .coordinatorEvent( .typeCallSessionParticipantJoinedEvent( .init( @@ -51,9 +51,9 @@ final class LobbyViewModelTests: XCTestCase, @unchecked Sendable { await fulfilmentInMainActor { self.subject.participants.count == 1 } } - + // MARK: - Leave Events Tests - + func test_subscribeForCallLeaveUpdates_removesParticipant() async throws { let mockCall = MockCall() mockCall.stub( @@ -69,7 +69,7 @@ final class LobbyViewModelTests: XCTestCase, @unchecked Sendable { _ = subject await fulfilmentInMainActor { self.subject.participants.count == 1 } - mockCall.onEvent( + await mockCall.onEvent( .coordinatorEvent( .typeCallSessionParticipantLeftEvent( .init( @@ -85,7 +85,7 @@ final class LobbyViewModelTests: XCTestCase, @unchecked Sendable { await fulfilmentInMainActor { self.subject.participants.isEmpty } } - + func test_subscribeForCallLeaveUpdates_doesNotRemoveWrongParticipant() async throws { let mockCall = MockCall() mockCall.stub( @@ -108,7 +108,7 @@ final class LobbyViewModelTests: XCTestCase, @unchecked Sendable { _ = subject await fulfilmentInMainActor { self.subject.participants.count == 2 } - mockCall.onEvent( + await mockCall.onEvent( .coordinatorEvent( .typeCallSessionParticipantLeftEvent( .init( diff --git a/StreamVideoTests/Call/Call_Tests.swift b/StreamVideoTests/Call/Call_Tests.swift index bc8019262..177513b9f 100644 --- a/StreamVideoTests/Call/Call_Tests.swift +++ b/StreamVideoTests/Call/Call_Tests.swift @@ -552,7 +552,7 @@ final class Call_Tests: StreamVideoTestCase, @unchecked Sendable { for step in steps { if step.onEventUpdate { - call.onEvent(.coordinatorEvent(step.event)) + await call.onEvent(.coordinatorEvent(step.event)) await fulfillment(timeout: 2) { step.validation(call) } } else { call.state.updateState(from: step.event) diff --git a/StreamVideoTests/Controllers/CallController_Tests.swift b/StreamVideoTests/Controllers/CallController_Tests.swift index 922029964..5dc5b6996 100644 --- a/StreamVideoTests/Controllers/CallController_Tests.swift +++ b/StreamVideoTests/Controllers/CallController_Tests.swift @@ -526,7 +526,7 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { .transition(MockTestOnlyStage()) await assertTransitionToStage(.blocked) { - call.onEvent( + await call.onEvent( .coordinatorEvent( .typeBlockedUserEvent( .init( @@ -550,7 +550,7 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { subject.call = nil await wait(for: 0.5) - call.onEvent( + await call.onEvent( .coordinatorEvent( .typeCallSessionParticipantCountsUpdatedEvent( .init( @@ -574,7 +574,7 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { let call = Call.dummy(callController: subject) await wait(for: 0.5) - call.onEvent( + await call.onEvent( .coordinatorEvent( .typeCallSessionParticipantCountsUpdatedEvent( .init( @@ -601,7 +601,7 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { let call = Call.dummy(callController: subject) await wait(for: 0.5) - call.onEvent( + await call.onEvent( .coordinatorEvent( .typeCallSessionParticipantCountsUpdatedEvent( .init( @@ -628,7 +628,7 @@ final class CallController_Tests: StreamVideoTestCase, @unchecked Sendable { let call = Call.dummy(callController: subject) await wait(for: 0.5) - call.onEvent( + await call.onEvent( .coordinatorEvent( .typeCallSessionParticipantCountsUpdatedEvent( .init(