Skip to content

Commit 917af32

Browse files
committed
[Enhancement]AsyncStreams are now backed by Publishers
1 parent 62de050 commit 917af32

File tree

2 files changed

+228
-65
lines changed

2 files changed

+228
-65
lines changed

Sources/StreamVideo/StreamVideo.swift

Lines changed: 82 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
1717
@Injected(\.callCache) private var callCache
1818
@Injected(\.timers) private var timers
1919

20+
private enum DisposableKey: String { case ringEventReceived }
21+
2022
public final class State: ObservableObject, @unchecked Sendable {
2123
@Published public internal(set) var connection: ConnectionStatus
2224
@Published public internal(set) var user: User
@@ -62,6 +64,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
6264
/// A protocol that provides a method to determine the rejection reason for a call.
6365
public lazy var rejectionReasonProvider: RejectionReasonProviding = StreamRejectionReasonProvider(self)
6466

67+
private let eventSubject: PassthroughSubject<WrappedEvent, Never> = .init()
68+
6569
var token: UserToken
6670

6771
private var tokenProvider: UserTokenProvider
@@ -78,7 +82,7 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
7882
private let eventsMiddleware = WSEventsMiddleware()
7983
private var cachedLocation: String?
8084
private var connectTask: Task<Void, Error>?
81-
private var eventHandlers = [EventHandler]()
85+
private let disposableBag = DisposableBag()
8286

8387
/// The notification center used to send and receive notifications about incoming events.
8488
private(set) lazy var eventNotificationCenter: EventNotificationCenter = {
@@ -216,6 +220,8 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
216220
if autoConnectOnInit {
217221
initialConnectIfRequired(apiKey: apiKey)
218222
}
223+
224+
observeCallRingEvents()
219225
}
220226

221227
deinit {
@@ -314,9 +320,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
314320

315321
/// Disconnects the current `StreamVideo` client.
316322
public func disconnect() async {
317-
eventHandlers.forEach { $0.cancel() }
318-
eventHandlers.removeAll()
319-
320323
await withCheckedContinuation { [webSocketClient] continuation in
321324
if let webSocketClient = webSocketClient {
322325
webSocketClient.disconnect {
@@ -327,35 +330,50 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
327330
}
328331
}
329332
}
330-
333+
334+
/// Publishes all received video events coming from the coordinator.
335+
///
336+
/// Use this method to observe all incoming `VideoEvent`s regardless of
337+
/// specific type. Events are filtered to only include those classified as
338+
/// `coordinatorEvent` cases.
339+
///
340+
/// - Returns: A publisher emitting `VideoEvent` instances.
341+
public func eventPublisher() -> AnyPublisher<VideoEvent, Never> {
342+
eventSubject
343+
.compactMap {
344+
guard case let .coordinatorEvent(event) = $0 else {
345+
return nil
346+
}
347+
return event
348+
}
349+
.eraseToAnyPublisher()
350+
}
351+
352+
/// Publishes specific typed WebSocket events.
353+
///
354+
/// Use this method to subscribe only to a specific type of event emitted by
355+
/// the coordinator. The `WSEvent` must conform to `Event`.
356+
///
357+
/// - Parameter event: The type of WebSocket event to observe.
358+
/// - Returns: A publisher emitting events of the specified `WSEvent` type.
359+
public func eventPublisher<WSEvent: Event>(
360+
for event: WSEvent.Type
361+
) -> AnyPublisher<WSEvent, Never> {
362+
eventSubject
363+
.compactMap { $0.unwrap()?.rawValue as? WSEvent }
364+
.eraseToAnyPublisher()
365+
}
366+
331367
/// Subscribes to all video events.
332368
/// - Returns: `AsyncStream` of `VideoEvent`s.
333369
public func subscribe() -> AsyncStream<VideoEvent> {
334-
AsyncStream(VideoEvent.self) { [weak self] continuation in
335-
let eventHandler = EventHandler(handler: { event in
336-
guard case let .coordinatorEvent(event) = event else {
337-
return
338-
}
339-
continuation.yield(event)
340-
}, cancel: { continuation.finish() })
341-
self?.eventHandlers.append(eventHandler)
342-
}
370+
eventPublisher().eraseAsAsyncStream()
343371
}
344372

345373
/// Subscribes to a particular WS event.
346374
/// - Returns: `AsyncStream` of the requested WS event.
347375
public func subscribe<WSEvent: Event>(for event: WSEvent.Type) -> AsyncStream<WSEvent> {
348-
AsyncStream(event) { [weak self] continuation in
349-
let eventHandler = EventHandler(handler: { event in
350-
guard let coordinatorEvent = event.unwrap() else {
351-
return
352-
}
353-
if let event = coordinatorEvent.unwrap() as? WSEvent {
354-
continuation.yield(event)
355-
}
356-
}, cancel: { continuation.finish() })
357-
self?.eventHandlers.append(eventHandler)
358-
}
376+
eventPublisher(for: event).eraseAsAsyncStream()
359377
}
360378

361379
public func queryCalls(
@@ -489,17 +507,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
489507
} else {
490508
throw ClientError.Unknown()
491509
}
492-
var connected = false
493-
var timeout = false
494-
let control = DefaultTimer.schedule(timeInterval: 30, queue: .sdk) {
495-
timeout = true
496-
}
497-
log.debug("Listening for WS connection")
498-
webSocketClient?.onConnected = {
499-
control.cancel()
500-
connected = true
501-
log.debug("WS connected")
502-
}
503510

504511
do {
505512
log.debug("Listening for WS connection")
@@ -560,12 +567,6 @@ public class StreamVideo: ObservableObject, @unchecked Sendable {
560567
|| webSocketClient?.connectionState == .authenticating else {
561568
return ""
562569
}
563-
564-
var timeout = false
565-
let control = DefaultTimer.schedule(timeInterval: 5, queue: .sdk) {
566-
timeout = true
567-
}
568-
log.debug("Waiting for connection id")
569570

570571
do {
571572
return try await timers
@@ -744,24 +745,55 @@ extension StreamVideo: ConnectionStateDelegate {
744745
connectionRecoveryHandler?.webSocketClient(client, didUpdateConnectionState: state)
745746
}
746747
}
747-
eventHandlers.forEach { $0.handler(.internalEvent(WSDisconnected())) }
748+
eventSubject.send(.internalEvent(WSDisconnected()))
748749
case .connected(healthCheckInfo: _):
749-
eventHandlers.forEach { $0.handler(.internalEvent(WSConnected())) }
750+
eventSubject.send(.internalEvent(WSConnected()))
750751
default:
751752
log.debug("Web socket connection state update \(state)")
752753
}
753754
}
755+
756+
/// Observes incoming call ring events from the coordinator.
757+
///
758+
/// This method subscribes to `typeCallRingEvent` messages from the internal
759+
/// event stream. When such an event is received, it attempts to retrieve or
760+
/// create a `Call` object matching the event's call ID and type. Once the
761+
/// call is found, it updates the call's state with the event data and sets it
762+
/// as the current `ringingCall`.
763+
///
764+
/// The resulting subscription is stored in `disposableBag` under a specific
765+
/// key to allow later cancellation or cleanup.
766+
private func observeCallRingEvents() {
767+
eventSubject
768+
.eraseToAnyPublisher()
769+
.compactMap { (source: WrappedEvent) -> CallRingEvent? in
770+
guard
771+
case let .typeCallRingEvent(event) = source.unwrap()
772+
else {
773+
return nil
774+
}
775+
return event
776+
}
777+
.compactMap { [weak self] (source: CallRingEvent) -> (event: CallRingEvent, call: Call)? in
778+
guard let call = self?.call(callType: source.call.type, callId: source.call.id) else {
779+
return nil
780+
}
781+
return (event: source, call: call)
782+
}
783+
.sinkTask(storeIn: disposableBag) { @MainActor [weak self] in
784+
guard let self else { return }
785+
$0.call.state.update(from: $0.event)
786+
self.state.ringingCall = $0.call
787+
}
788+
.store(in: disposableBag, key: DisposableKey.ringEventReceived.rawValue)
789+
}
754790
}
755791

756792
extension StreamVideo: WSEventsSubscriber {
757793

758794
func onEvent(_ event: WrappedEvent) {
759-
for eventHandler in eventHandlers {
760-
eventHandler.handler(event)
761-
}
762-
Task { @MainActor [weak self] in
763-
self?.checkRingEvent(event)
764-
}
795+
eventSubject.send(event)
796+
checkRingEvent(event)
765797
}
766798

767799
private func checkRingEvent(_ event: WrappedEvent) {

0 commit comments

Comments
 (0)