Skip to content

Commit a423812

Browse files
authored
[Fix]Reorganize Call state-machine and its transitions (#747)
1 parent 191140c commit a423812

File tree

50 files changed

+2017
-1160
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

50 files changed

+2017
-1160
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
44

55
# Upcoming
66

7+
### 🔄 Changed
8+
- Improved the flow of joining a call [#747](https://github.com/GetStream/stream-video-swift/pull/747)
9+
710
### 🐞 Fixed
811
- Fix an issue causing audio/video misalignment with the server. [#772](https://github.com/GetStream/stream-video-swift/pull/772)
912

Sources/StreamVideo/Call.swift

Lines changed: 77 additions & 83 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
1313
@Injected(\.streamVideo) var streamVideo
1414
@Injected(\.callCache) var callCache
1515

16-
private lazy var stateMachine: StreamCallStateMachine = .init(self)
16+
private lazy var stateMachine: StateMachine = .init(self)
1717

1818
@MainActor
1919
public internal(set) var state = CallState()
@@ -53,6 +53,9 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
5353
internal let coordinatorClient: DefaultAPI
5454
private var cancellables = DisposableBag()
5555

56+
/// A serialQueueActor ensuring that call operations (e.g. join) will happen in a serial manner.
57+
private let callOperationSerialQueue = SerialActorQueue()
58+
5659
/// This adapter is used to manage closed captions for the
5760
/// call.
5861
private lazy var closedCaptionsAdapter = ClosedCaptionsAdapter(self)
@@ -144,50 +147,42 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
144147
notify: Bool = false,
145148
callSettings: CallSettings? = nil
146149
) async throws -> JoinCallResponse {
147-
let currentStage = stateMachine.currentStage
148-
switch currentStage.id {
149-
case .joining:
150-
break
151-
case .joined where currentStage is StreamCallStateMachine.Stage.JoinedStage:
152-
let stage = currentStage as! StreamCallStateMachine.Stage.JoinedStage
153-
return stage.response
154-
default:
155-
stateMachine.transition(
156-
.joining(
157-
self,
158-
actionBlock: { [weak self] in
159-
guard let self else { throw ClientError.Unexpected() }
160-
return try await executeTask(retryPolicy: .fastAndSimple, task: { [weak self] in
161-
guard let self else { throw ClientError.Unexpected() }
162-
let response = try await callController.joinCall(
150+
try await callOperationSerialQueue.sync { [weak self] in
151+
guard let self else {
152+
throw ClientError()
153+
}
154+
let currentStage = stateMachine.currentStage
155+
156+
if
157+
currentStage.id == .joined,
158+
case let .joined(joinResponse) = currentStage.context.output {
159+
return joinResponse
160+
} else if
161+
currentStage.id == .joining,
162+
case let .join(input) = currentStage.context.input {
163+
return try await input
164+
.deliverySubject
165+
.nextValue(timeout: CallConfiguration.timeout.join)
166+
} else {
167+
let deliverySubject = PassthroughSubject<JoinCallResponse, Error>()
168+
stateMachine.transition(
169+
.joining(
170+
self,
171+
input: .join(
172+
.init(
163173
create: create,
164174
callSettings: callSettings,
165175
options: options,
166176
ring: ring,
167-
notify: notify
177+
notify: notify,
178+
deliverySubject: deliverySubject
168179
)
169-
if let callSettings {
170-
await state.update(callSettings: callSettings)
171-
}
172-
await state.update(from: response)
173-
let updated = await state.callSettings
174-
updateCallSettingsManagers(with: updated)
175-
Task { @MainActor [weak self] in
176-
self?.streamVideo.state.activeCall = self
177-
}
178-
return response
179-
})
180-
}
180+
)
181+
)
181182
)
182-
)
183+
return try await deliverySubject.nextValue(timeout: CallConfiguration.timeout.join)
184+
}
183185
}
184-
185-
return try await stateMachine
186-
.nextStageShouldBe(
187-
StreamCallStateMachine.Stage.JoinedStage.self,
188-
dropFirst: 1
189-
)
190-
.response
191186
}
192187

193188
/// Gets the call on the backend with the given parameters.
@@ -350,24 +345,25 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
350345
@discardableResult
351346
public func accept() async throws -> AcceptCallResponse {
352347
let currentStage = stateMachine.currentStage
353-
switch currentStage.id {
354-
case .accepting:
355-
break
356-
case .accepted where currentStage is StreamCallStateMachine.Stage.AcceptedStage:
357-
let stage = currentStage as! StreamCallStateMachine.Stage.AcceptedStage
358-
return stage.response
359-
default:
360-
stateMachine.transition(.accepting(self, actionBlock: { [coordinatorClient, callType, callId] in
361-
try await coordinatorClient.acceptCall(type: callType, id: callId)
362-
}))
363-
}
364348

365-
return try await stateMachine
366-
.nextStageShouldBe(
367-
StreamCallStateMachine.Stage.AcceptedStage.self,
368-
dropFirst: 1
349+
if
350+
currentStage.id == .accepted,
351+
case let .accepted(response) = currentStage.context.output {
352+
return response
353+
} else if
354+
currentStage.id == .accepting,
355+
case let .accepting(deliverySubject) = currentStage.context.input {
356+
return try await deliverySubject.nextValue(timeout: CallConfiguration.timeout.accept)
357+
} else {
358+
let deliverySubject = PassthroughSubject<AcceptCallResponse, Error>()
359+
stateMachine.transition(
360+
.accepting(
361+
self,
362+
input: .accepting(deliverySubject: deliverySubject)
363+
)
369364
)
370-
.response
365+
return try await deliverySubject.nextValue(timeout: CallConfiguration.timeout.accept)
366+
}
371367
}
372368

373369
/// Rejects a call with an optional reason.
@@ -378,34 +374,27 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
378374
@discardableResult
379375
public func reject(reason: String? = nil) async throws -> RejectCallResponse {
380376
let currentStage = stateMachine.currentStage
381-
switch currentStage.id {
382-
case .rejecting:
383-
break
384-
case .rejected where currentStage is StreamCallStateMachine.Stage.RejectedStage:
385-
let stage = currentStage as! StreamCallStateMachine.Stage.RejectedStage
386-
return stage.response
387-
default:
388-
stateMachine.transition(.rejecting(self, actionBlock: { [coordinatorClient, callType, callId, streamVideo, cId] in
389-
let response = try await coordinatorClient.rejectCall(
390-
type: callType,
391-
id: callId,
392-
rejectCallRequest: .init(reason: reason)
393-
)
394-
if streamVideo.state.ringingCall?.cId == cId {
395-
Task { @MainActor in
396-
streamVideo.state.ringingCall = nil
397-
}
398-
}
399-
return response
400-
}))
401-
}
402377

403-
return try await stateMachine
404-
.nextStageShouldBe(
405-
StreamCallStateMachine.Stage.RejectedStage.self,
406-
dropFirst: 1
378+
if
379+
currentStage.id == .rejected,
380+
case let .rejected(response) = currentStage.context.output {
381+
return response
382+
} else if
383+
currentStage.id == .rejecting,
384+
case let .rejecting(input) = currentStage.context.input {
385+
return try await input
386+
.deliverySubject
387+
.nextValue(timeout: CallConfiguration.timeout.reject)
388+
} else {
389+
let deliverySubject = PassthroughSubject<RejectCallResponse, Error>()
390+
stateMachine.transition(
391+
.rejecting(
392+
self,
393+
input: .rejecting(.init(deliverySubject: deliverySubject))
394+
)
407395
)
408-
.response
396+
return try await deliverySubject.nextValue(timeout: CallConfiguration.timeout.reject)
397+
}
409398
}
410399

411400
/// Adds the given user to the list of blocked users for the call.
@@ -521,7 +510,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
521510
cancellables.removeAll()
522511
callController.leave()
523512
closedCaptionsAdapter.stop()
524-
stateMachine.transition(.idle(self))
513+
stateMachine.transition(.idle(.init(call: self)))
525514
/// Upon `Call.leave` we remove the call from the cache. Any further actions that are required
526515
/// to happen on the call object (e.g. rejoin) will need to fetch a new instance from `StreamVideo`
527516
/// client.
@@ -1391,7 +1380,12 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
13911380
if stateMachine.currentStage.id == .joined {
13921381
state.disconnectionError = error
13931382
}
1394-
stateMachine.transition(.error(self, error: error))
1383+
stateMachine.transition(
1384+
.error(
1385+
.init(call: self),
1386+
error: error
1387+
)
1388+
)
13951389
}
13961390

13971391
// MARK: - private
@@ -1531,7 +1525,7 @@ public class Call: @unchecked Sendable, WSEventsSubscriber {
15311525
}
15321526
}
15331527

1534-
private func updateCallSettingsManagers(with callSettings: CallSettings) {
1528+
func updateCallSettingsManagers(with callSettings: CallSettings) {
15351529
microphone.status = callSettings.audioOn ? .enabled : .disabled
15361530
camera.status = callSettings.videoOn ? .enabled : .disabled
15371531
camera.direction = callSettings.cameraPosition
Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,57 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
7+
/// Configuration settings for call operations.
8+
///
9+
/// This enum provides timeout values and other configuration settings
10+
/// for various call operations such as joining, accepting, and rejecting calls.
11+
enum CallConfiguration {
12+
13+
/// Timeout settings for different call operations.
14+
struct Timeout {
15+
/// Timeout duration for joining a call in seconds.
16+
var join: TimeInterval
17+
18+
/// Timeout duration for accepting a call in seconds.
19+
var accept: TimeInterval
20+
21+
/// Timeout duration for rejecting a call in seconds.
22+
var reject: TimeInterval
23+
24+
/// Timeout values for authentication in production environment.
25+
///
26+
/// These values are used when the app is running in production mode.
27+
static let production = Timeout(
28+
join: 30,
29+
accept: 10,
30+
reject: 10
31+
)
32+
33+
#if STREAM_TESTS
34+
/// Timeout values for authentication in test environment.
35+
///
36+
/// These values are used when the app is running in test mode.
37+
/// They are shorter than production values to speed up testing.
38+
static let testing = Timeout(
39+
join: 10,
40+
accept: 10,
41+
reject: 10
42+
)
43+
#endif
44+
}
45+
46+
/// Timeout values for various Call operations.
47+
///
48+
/// This property returns the appropriate timeout values based on the
49+
/// current build configuration (test or production).
50+
static var timeout: Timeout {
51+
#if STREAM_TESTS
52+
return .testing
53+
#else
54+
return .production
55+
#endif
56+
}
57+
}
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
import Foundation
7+
8+
/// A state machine that manages the states of a stream call.
9+
extension Call {
10+
final class StateMachine {
11+
/// The underlying state machine that handles state transitions.
12+
private let stateMachine: StreamStateMachine<Stage>
13+
14+
/// The current stage of the state machine.
15+
var currentStage: Stage { stateMachine.currentStage }
16+
17+
/// A publisher that emits the current stage of the state machine.
18+
var publisher: AnyPublisher<Stage, Never> { stateMachine.publisher.eraseToAnyPublisher() }
19+
20+
/// Initializes the state machine with the idle stage for a given call.
21+
///
22+
/// - Parameter call: The call to be managed by the state machine.
23+
init(_ call: Call) {
24+
stateMachine = .init(initialStage: .idle(.init(call: call)))
25+
}
26+
27+
/// Transitions the state machine to the given next stage.
28+
///
29+
/// - Parameter nextStage: The next stage to transition to.
30+
/// - Throws: An error if the transition is invalid.
31+
func transition(
32+
_ nextStage: Stage,
33+
file: StaticString = #file,
34+
function: StaticString = #function,
35+
line: UInt = #line
36+
) {
37+
stateMachine.transition(
38+
to: nextStage,
39+
file: file,
40+
function: function,
41+
line: line
42+
)
43+
}
44+
}
45+
}

0 commit comments

Comments
 (0)