Skip to content

Commit 72d0f03

Browse files
authored
Stats v2 (#806)
1 parent 1284648 commit 72d0f03

File tree

85 files changed

+6471
-586
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

85 files changed

+6471
-586
lines changed

Sources/StreamVideo/Controllers/CallController.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -693,7 +693,7 @@ class CallController: @unchecked Sendable {
693693
private func observeStatsReporterUpdates() async {
694694
await webRTCCoordinator
695695
.stateAdapter
696-
.$statsReporter
696+
.$statsAdapter
697697
.compactMap { $0 }
698698
.sink { [weak disposableBag, weak self] statsReporter in
699699
statsReporter

Sources/StreamVideo/Models/CallStatsReport.swift

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@ public struct CallStatsReport: Sendable {
2222
public let participantsStats: ParticipantsStats
2323
/// The timestamp when the call stats report was generated.
2424
public let timestamp: Double
25+
26+
let trackToKindMap: [String: TrackType]
2527
}
2628

2729
/// A struct representing statistics for participants in the call.
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
7+
/// A type-erased `Encodable` value with optional equality support.
8+
///
9+
/// `AnyEncodable` enables encoding of values whose concrete type is
10+
/// unknown at compile time. It captures the encoding logic using a
11+
/// closure and optionally supports equality if initialized with an
12+
/// `Equatable` value.
13+
struct AnyEncodable: Encodable, Equatable, @unchecked Sendable {
14+
/// Encodes the value to an `Encoder`. This closure is assigned during
15+
/// initialization and stores the encoding logic for the wrapped value.
16+
private let _encode: (Encoder) throws -> Void
17+
/// Compares the stored value against another. If the original value was
18+
/// `Equatable`, this closure performs type-safe equality comparison.
19+
private let _isEqual: (Any) -> Bool
20+
21+
/// The underlying value, which must conform to `Encodable`.
22+
///
23+
/// If equality support is desired, the value must also conform to
24+
/// `Equatable` and be initialized using the corresponding initializer.
25+
let value: Any
26+
27+
/// Initializes from an existing `AnyEncodable` to avoid re-wrapping.
28+
///
29+
/// This initializer allows preserving the original encoding and equality
30+
/// closures when passing or storing `AnyEncodable` values.
31+
init(_ value: AnyEncodable) {
32+
self = value
33+
}
34+
35+
/// Initializes with an `Encodable` value, without equality support.
36+
///
37+
/// The provided value is encoded using a closure captured at
38+
/// initialization time. Equality comparison will always return false.
39+
///
40+
/// - Parameter value: The value to wrap.
41+
init<T: Encodable>(_ value: T) {
42+
_encode = { encoder in
43+
try value.encode(to: encoder)
44+
}
45+
_isEqual = { _ in false }
46+
47+
self.value = value
48+
}
49+
50+
/// Initializes with a value that conforms to both `Encodable` and
51+
/// `Equatable`, enabling equality support.
52+
///
53+
/// This version stores a type-safe equality closure that will return
54+
/// true only if the other value has the same type and is equal.
55+
///
56+
/// - Parameter value: The value to wrap.
57+
init<T: Encodable & Equatable>(_ value: T) {
58+
_encode = { encoder in
59+
try value.encode(to: encoder)
60+
}
61+
_isEqual = { other in
62+
guard let other = other as? T else {
63+
return false
64+
}
65+
return other == value
66+
}
67+
68+
self.value = value
69+
}
70+
71+
/// Encodes the stored value using the provided encoder.
72+
///
73+
/// - Parameter encoder: The encoder to write data to.
74+
/// - Throws: An error if the encoding process fails.
75+
func encode(to encoder: Encoder) throws {
76+
try _encode(encoder)
77+
}
78+
79+
/// Compares two `AnyEncodable` instances for equality.
80+
///
81+
/// This uses the stored equality closure. If the values were not
82+
/// initialized with `Equatable` conformance, this always returns false.
83+
///
84+
/// - Parameters:
85+
/// - lhs: The first `AnyEncodable` value.
86+
/// - rhs: The second `AnyEncodable` value.
87+
/// - Returns: `true` if the wrapped values are equal.
88+
static func == (lhs: AnyEncodable, rhs: AnyEncodable) -> Bool {
89+
lhs._isEqual(rhs.value)
90+
}
91+
}

Sources/StreamVideo/Utils/AudioSession/StreamAudioSession.swift

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -409,6 +409,27 @@ final class StreamAudioSession: @unchecked Sendable, ObservableObject {
409409
}
410410
}
411411

412+
extension StreamAudioSession: Encodable {
413+
// MARK: - Codable
414+
415+
enum CodingKeys: String, CodingKey {
416+
case speakerOn
417+
case device
418+
case deviceIsExternal = "device.isExternal"
419+
case deviceIsSpeaker = "device.isSpeaker"
420+
case deviceIsReceiver = "device.isReceiver"
421+
}
422+
423+
func encode(to encoder: Encoder) throws {
424+
var container = encoder.container(keyedBy: CodingKeys.self)
425+
try container.encode(activeCallSettings.speakerOn, forKey: .speakerOn)
426+
try container.encode("\(audioSession.currentRoute)", forKey: .device)
427+
try container.encode(audioSession.currentRoute.isExternal, forKey: .deviceIsExternal)
428+
try container.encode(audioSession.currentRoute.isSpeaker, forKey: .deviceIsSpeaker)
429+
try container.encode(audioSession.currentRoute.isReceiver, forKey: .deviceIsReceiver)
430+
}
431+
}
432+
412433
/// A key for dependency injection of an `AudioSessionProtocol` instance
413434
/// that represents the active call audio session.
414435
extension StreamAudioSession: InjectionKey {
Lines changed: 144 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,144 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Combine
6+
import Foundation
7+
8+
/// A thread-safe buffer that collects items for later consumption.
9+
///
10+
/// Supports on-demand appending or reactive observation from a
11+
/// publisher source. Items can be consumed in batch, with optional
12+
/// flushing of the internal buffer.
13+
final class ConsumableBucket<Element>: @unchecked Sendable {
14+
15+
/// Represents the type of bucket: on-demand or publisher-driven.
16+
enum BucketType {
17+
/// A manually managed bucket where items are appended directly.
18+
case onDemand
19+
/// A bucket that observes and collects items from a publisher.
20+
case observer(AnyPublisher<Element, Never>)
21+
}
22+
23+
private let queue = UnfairQueue()
24+
25+
private var items: [Element] = []
26+
private var cancellable: AnyCancellable?
27+
28+
/// Initializes the bucket with a transformed publisher source.
29+
///
30+
/// - Parameters:
31+
/// - source: A publisher of source values.
32+
/// - transformer: A transformer to map source to bucket elements.
33+
convenience init<Source, Transformer: ConsumableBucketItemTransformer>(
34+
_ source: AnyPublisher<Source, Never>,
35+
transformer: Transformer
36+
) where Transformer.Input == Source, Transformer.Output == Element {
37+
self.init(
38+
.observer(
39+
source
40+
.compactMap { transformer.transform($0) }
41+
.eraseToAnyPublisher()
42+
)
43+
)
44+
}
45+
46+
/// Initializes the bucket with a transformed publisher source and
47+
/// optionally removes duplicates.
48+
///
49+
/// - Parameters:
50+
/// - source: A publisher of equatable source values.
51+
/// - transformer: A transformer to map source to bucket elements.
52+
/// - removeDuplicates: Whether to remove duplicate values.
53+
convenience init<Source: Equatable, Transformer: ConsumableBucketItemTransformer>(
54+
_ source: AnyPublisher<Source, Never>,
55+
transformer: Transformer,
56+
removeDuplicates: Bool
57+
) where Transformer.Input == Source, Transformer.Output == Element {
58+
if removeDuplicates {
59+
self.init(
60+
.observer(
61+
source
62+
.removeDuplicates()
63+
.compactMap { transformer.transform($0) }
64+
.eraseToAnyPublisher()
65+
)
66+
)
67+
} else {
68+
self.init(source, transformer: transformer)
69+
}
70+
}
71+
72+
/// Initializes the bucket with a publisher of bucket elements.
73+
///
74+
/// - Parameter source: The publisher emitting elements to store.
75+
convenience init(
76+
_ source: AnyPublisher<Element, Never>
77+
) {
78+
self.init(.observer(source))
79+
}
80+
81+
/// Initializes the bucket for manual on-demand use.
82+
convenience init() {
83+
self.init(.onDemand)
84+
}
85+
86+
private init(
87+
_ bucketType: BucketType
88+
) {
89+
switch bucketType {
90+
case .onDemand:
91+
break
92+
case let .observer(publisher):
93+
cancellable = publisher.sink { [weak self] in self?.process($0) }
94+
}
95+
}
96+
97+
/// Appends a single element to the bucket.
98+
///
99+
/// - Parameter element: The item to be appended.
100+
func append(_ element: Element) {
101+
queue.sync { [weak self] in self?.items.append(element) }
102+
}
103+
104+
/// Returns the items in the bucket.
105+
///
106+
/// - Parameter flush: Whether to clear the buffer after returning.
107+
/// - Returns: An array of stored elements.
108+
func consume(flush: Bool = false) -> [Element] {
109+
if flush {
110+
return queue.sync {
111+
let result = items
112+
items = []
113+
return result
114+
}
115+
} else {
116+
return queue.sync { items }
117+
}
118+
}
119+
120+
/// Inserts elements at a specific position in the buffer.
121+
///
122+
/// - Parameters:
123+
/// - items: The elements to insert.
124+
/// - index: The position at which to insert them.
125+
func insert(_ items: [Element], at index: Int) {
126+
guard !items.isEmpty else {
127+
return
128+
}
129+
queue.sync {
130+
self.items.insert(contentsOf: items, at: index)
131+
}
132+
}
133+
134+
// MARK: - Private Helpers
135+
136+
/// Processes an incoming item from the observed publisher.
137+
///
138+
/// - Parameter newItem: The new item to append.
139+
private func process(_ newItem: Element) {
140+
queue.sync {
141+
self.items.append(newItem)
142+
}
143+
}
144+
}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
7+
/// A transformer used by `ConsumableBucket` to map source values to
8+
/// consumable elements.
9+
///
10+
/// This protocol allows a bucket to observe one data type while
11+
/// storing another, through a transformation step. Implementors define
12+
/// the transformation logic between `Input` and `Output`.
13+
protocol ConsumableBucketItemTransformer {
14+
15+
/// The input type received from the upstream publisher.
16+
associatedtype Input
17+
/// The output type to be stored in the `ConsumableBucket`.
18+
associatedtype Output
19+
20+
/// Transforms an input into a bucket-storable output value.
21+
///
22+
/// - Parameter input: The incoming value to transform.
23+
/// - Returns: The transformed output.
24+
func transform(_ input: Input) -> Output
25+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
//
2+
// Copyright © 2025 Stream.io Inc. All rights reserved.
3+
//
4+
5+
import Foundation
6+
7+
extension Date {
8+
/// The number of milliseconds since 1970-01-01 00:00:00 UTC.
9+
///
10+
/// This converts the date's time interval since the Unix epoch into a
11+
/// rounded 64-bit integer value in milliseconds. Useful for precise
12+
/// timestamping or encoding dates for transport in APIs.
13+
var millisecondsSince1970: Int64 {
14+
Int64((timeIntervalSince1970 * 1000.0).rounded())
15+
}
16+
}

0 commit comments

Comments
 (0)