@@ -17,6 +17,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
17
17
@Injected ( \. callCache) private var callCache
18
18
@Injected ( \. timers) private var timers
19
19
20
+ private enum DisposableKey : String { case ringEventReceived }
21
+
20
22
public final class State : ObservableObject , @unchecked Sendable {
21
23
@Published public internal( set) var connection : ConnectionStatus
22
24
@Published public internal( set) var user : User
@@ -62,6 +64,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
62
64
/// A protocol that provides a method to determine the rejection reason for a call.
63
65
public lazy var rejectionReasonProvider : RejectionReasonProviding = StreamRejectionReasonProvider ( self )
64
66
67
+ private let eventSubject : PassthroughSubject < WrappedEvent , Never > = . init( )
68
+
65
69
var token : UserToken
66
70
67
71
private var tokenProvider : UserTokenProvider
@@ -78,7 +82,7 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
78
82
private let eventsMiddleware = WSEventsMiddleware ( )
79
83
private var cachedLocation : String ?
80
84
private var connectTask : Task < Void , Error > ?
81
- private var eventHandlers = [ EventHandler ] ( )
85
+ private let disposableBag = DisposableBag ( )
82
86
83
87
/// The notification center used to send and receive notifications about incoming events.
84
88
private( set) lazy var eventNotificationCenter : EventNotificationCenter = {
@@ -219,6 +223,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
219
223
if autoConnectOnInit {
220
224
initialConnectIfRequired ( apiKey: apiKey)
221
225
}
226
+
227
+ observeCallRingEvents ( )
222
228
}
223
229
224
230
deinit {
@@ -317,9 +323,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
317
323
318
324
/// Disconnects the current `StreamVideo` client.
319
325
public func disconnect( ) async {
320
- eventHandlers. forEach { $0. cancel ( ) }
321
- eventHandlers. removeAll ( )
322
-
323
326
await withCheckedContinuation { [ webSocketClient] continuation in
324
327
if let webSocketClient = webSocketClient {
325
328
webSocketClient. disconnect {
@@ -330,35 +333,50 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
330
333
}
331
334
}
332
335
}
333
-
336
+
337
+ /// Publishes all received video events coming from the coordinator.
338
+ ///
339
+ /// Use this method to observe all incoming `VideoEvent`s regardless of
340
+ /// specific type. Events are filtered to only include those classified as
341
+ /// `coordinatorEvent` cases.
342
+ ///
343
+ /// - Returns: A publisher emitting `VideoEvent` instances.
344
+ public func eventPublisher( ) -> AnyPublisher < VideoEvent , Never > {
345
+ eventSubject
346
+ . compactMap {
347
+ guard case let . coordinatorEvent( event) = $0 else {
348
+ return nil
349
+ }
350
+ return event
351
+ }
352
+ . eraseToAnyPublisher ( )
353
+ }
354
+
355
+ /// Publishes specific typed WebSocket events.
356
+ ///
357
+ /// Use this method to subscribe only to a specific type of event emitted by
358
+ /// the coordinator. The `WSEvent` must conform to `Event`.
359
+ ///
360
+ /// - Parameter event: The type of WebSocket event to observe.
361
+ /// - Returns: A publisher emitting events of the specified `WSEvent` type.
362
+ public func eventPublisher< WSEvent: Event > (
363
+ for event: WSEvent . Type
364
+ ) -> AnyPublisher < WSEvent , Never > {
365
+ eventSubject
366
+ . compactMap { $0. unwrap ( ) ? . rawValue as? WSEvent }
367
+ . eraseToAnyPublisher ( )
368
+ }
369
+
334
370
/// Subscribes to all video events.
335
371
/// - Returns: `AsyncStream` of `VideoEvent`s.
336
372
public func subscribe( ) -> AsyncStream < VideoEvent > {
337
- AsyncStream ( VideoEvent . self) { [ weak self] continuation in
338
- let eventHandler = EventHandler ( handler: { event in
339
- guard case let . coordinatorEvent( event) = event else {
340
- return
341
- }
342
- continuation. yield ( event)
343
- } , cancel: { continuation. finish ( ) } )
344
- self ? . eventHandlers. append ( eventHandler)
345
- }
373
+ eventPublisher ( ) . eraseAsAsyncStream ( )
346
374
}
347
375
348
376
/// Subscribes to a particular WS event.
349
377
/// - Returns: `AsyncStream` of the requested WS event.
350
378
public func subscribe< WSEvent: Event > ( for event: WSEvent . Type ) -> AsyncStream < WSEvent > {
351
- AsyncStream ( event) { [ weak self] continuation in
352
- let eventHandler = EventHandler ( handler: { event in
353
- guard let coordinatorEvent = event. unwrap ( ) else {
354
- return
355
- }
356
- if let event = coordinatorEvent. unwrap ( ) as? WSEvent {
357
- continuation. yield ( event)
358
- }
359
- } , cancel: { continuation. finish ( ) } )
360
- self ? . eventHandlers. append ( eventHandler)
361
- }
379
+ eventPublisher ( for: event) . eraseAsAsyncStream ( )
362
380
}
363
381
364
382
public func queryCalls(
@@ -544,12 +562,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
544
562
|| webSocketClient? . connectionState == . authenticating else {
545
563
return " "
546
564
}
547
-
548
- var timeout = false
549
- let control = DefaultTimer . schedule ( timeInterval: 5 , queue: . sdk) {
550
- timeout = true
551
- }
552
- log. debug ( " Waiting for connection id " )
553
565
554
566
do {
555
567
return try await timers
@@ -728,24 +740,55 @@ extension StreamVideo: ConnectionStateDelegate {
728
740
connectionRecoveryHandler? . webSocketClient ( client, didUpdateConnectionState: state)
729
741
}
730
742
}
731
- eventHandlers . forEach { $0 . handler ( . internalEvent( WSDisconnected ( ) ) ) }
743
+ eventSubject . send ( . internalEvent( WSDisconnected ( ) ) )
732
744
case . connected( healthCheckInfo: _) :
733
- eventHandlers . forEach { $0 . handler ( . internalEvent( WSConnected ( ) ) ) }
745
+ eventSubject . send ( . internalEvent( WSConnected ( ) ) )
734
746
default :
735
747
log. debug ( " Web socket connection state update \( state) " )
736
748
}
737
749
}
750
+
751
+ /// Observes incoming call ring events from the coordinator.
752
+ ///
753
+ /// This method subscribes to `typeCallRingEvent` messages from the internal
754
+ /// event stream. When such an event is received, it attempts to retrieve or
755
+ /// create a `Call` object matching the event's call ID and type. Once the
756
+ /// call is found, it updates the call's state with the event data and sets it
757
+ /// as the current `ringingCall`.
758
+ ///
759
+ /// The resulting subscription is stored in `disposableBag` under a specific
760
+ /// key to allow later cancellation or cleanup.
761
+ private func observeCallRingEvents( ) {
762
+ eventSubject
763
+ . eraseToAnyPublisher ( )
764
+ . compactMap { ( source: WrappedEvent ) -> CallRingEvent ? in
765
+ guard
766
+ case let . typeCallRingEvent( event) = source. unwrap ( )
767
+ else {
768
+ return nil
769
+ }
770
+ return event
771
+ }
772
+ . compactMap { [ weak self] ( source: CallRingEvent ) -> ( event: CallRingEvent , call: Call ) ? in
773
+ guard let call = self ? . call ( callType: source. call. type, callId: source. call. id) else {
774
+ return nil
775
+ }
776
+ return ( event: source, call: call)
777
+ }
778
+ . sinkTask ( storeIn: disposableBag) { @MainActor [ weak self] in
779
+ guard let self else { return }
780
+ $0. call. state. update ( from: $0. event)
781
+ self . state. ringingCall = $0. call
782
+ }
783
+ . store ( in: disposableBag, key: DisposableKey . ringEventReceived. rawValue)
784
+ }
738
785
}
739
786
740
787
extension StreamVideo : WSEventsSubscriber {
741
788
742
- func onEvent( _ event: WrappedEvent ) {
743
- for eventHandler in eventHandlers {
744
- eventHandler. handler ( event)
745
- }
746
- Task { @MainActor [ weak self] in
747
- self ? . checkRingEvent ( event)
748
- }
789
+ func onEvent( _ event: WrappedEvent ) async {
790
+ eventSubject. send ( event)
791
+ checkRingEvent ( event)
749
792
}
750
793
751
794
private func checkRingEvent( _ event: WrappedEvent ) {
0 commit comments