1
1
import Dispatch
2
2
import Foundation
3
3
4
- // TODO: Keep weak references to cancellables for downstream observations (instead
5
- // of the current strong references).
6
-
7
4
/// A type that produces valueless observations.
8
5
public class Publisher {
9
6
/// The id for the next observation (ids are used to cancel observations).
10
7
private var nextObservationId = 0
11
8
/// All current observations keyed by their id (ids are used to cancel observations).
12
9
private var observations : [ Int : ( ) -> Void ] = [ : ]
13
- /// Cancellable observations of downstream observers.
14
- private var cancellables : [ Cancellable ] = [ ]
15
10
/// Human-readable tag for debugging purposes.
16
11
private var tag : String ?
17
12
18
- /// The time at which the last update merging event occurred in
19
- /// `observeOnMainThreadAvoidingStarvation`.
20
- private var lastUpdateMergeTime : TimeInterval = 0
21
- /// The amount of time taken per state update, exponentially averaged over time.
22
- private var exponentiallySmoothedUpdateLength : Double = 0
13
+ /// We guard this against data races, with serialUpdateHandlingQueue, and
14
+ /// with our lives.
15
+ private class UpdateStatistics : @unchecked Sendable {
16
+ /// The time at which the last update merging event occurred in
17
+ /// `observeOnMainThreadAvoidingStarvation`.
18
+ var lastUpdateMergeTime : TimeInterval = 0
19
+ /// The amount of time taken per state update, exponentially averaged over time.
20
+ var exponentiallySmoothedUpdateLength : Double = 0
21
+ }
22
+
23
+ private let updateStatistics = UpdateStatistics ( )
24
+
25
+ private let serialUpdateHandlingQueue = DispatchQueue (
26
+ label: " serial update handling "
27
+ )
28
+ private let semaphore = DispatchSemaphore ( value: 1 )
23
29
24
30
/// Creates a new independent publisher.
25
31
public init ( ) { }
@@ -51,7 +57,6 @@ public class Publisher {
51
57
self . send ( )
52
58
} )
53
59
cancellable. tag ( with: " \( tag ?? " no tag " ) <-> \( cancellable. tag ?? " no tag " ) " )
54
- cancellables. append ( cancellable)
55
60
return cancellable
56
61
}
57
62
@@ -91,12 +96,11 @@ public class Publisher {
91
96
/// guaranteed that updates will always run serially.
92
97
func observeAsUIUpdater< Backend: AppBackend > (
93
98
backend: Backend ,
94
- action closure : @escaping ( ) -> Void
99
+ action: @escaping @ MainActor @ Sendable ( ) -> Void
95
100
) -> Cancellable {
96
- let serialUpdateHandlingQueue = DispatchQueue (
97
- label: " serial update handling "
98
- )
99
- let semaphore = DispatchSemaphore ( value: 1 )
101
+ let semaphore = self . semaphore
102
+ let serialUpdateHandlingQueue = self . serialUpdateHandlingQueue
103
+ let updateStatistics = self . updateStatistics
100
104
return observe {
101
105
// Only allow one update to wait at a time.
102
106
guard semaphore. wait ( timeout: . now( ) ) == . success else {
@@ -105,7 +109,7 @@ public class Publisher {
105
109
// as long as it happens within the next update or two.
106
110
let mergeTime = ProcessInfo . processInfo. systemUptime
107
111
serialUpdateHandlingQueue. async {
108
- self . lastUpdateMergeTime = mergeTime
112
+ updateStatistics . lastUpdateMergeTime = mergeTime
109
113
}
110
114
return
111
115
}
@@ -125,22 +129,23 @@ public class Publisher {
125
129
// Run the closure and while we're at it measure how long it takes
126
130
// so that we can use it when throttling if updates start backing up.
127
131
let start = ProcessInfo . processInfo. systemUptime
128
- closure ( )
132
+ action ( )
129
133
let elapsed = ProcessInfo . processInfo. systemUptime - start
130
134
131
135
// I chose exponential smoothing because it's simple to compute, doesn't
132
136
// require storing a window of previous values, and quickly converges to
133
- // a sensible value when the average moves while still somewhat ignoring
137
+ // a sensible value when the average moves, while still somewhat ignoring
134
138
// outliers.
135
- self . exponentiallySmoothedUpdateLength =
136
- elapsed / 2 + self . exponentiallySmoothedUpdateLength / 2
139
+ updateStatistics . exponentiallySmoothedUpdateLength =
140
+ elapsed / 2 + updateStatistics . exponentiallySmoothedUpdateLength / 2
137
141
}
138
142
139
- if ProcessInfo . processInfo. systemUptime - self . lastUpdateMergeTime < 1 {
143
+ if ProcessInfo . processInfo. systemUptime - updateStatistics . lastUpdateMergeTime < 1 {
140
144
// The factor of 1.5 was determined empirically. This algorithm is
141
145
// open for improvements since it's purely here to reduce the risk
142
- // of UI freezes.
143
- let throttlingDelay = self . exponentiallySmoothedUpdateLength * 1.5
146
+ // of UI freezes. A factor of 1.5 equates to a gap between updates of
147
+ // approximately 50% of the average update length.
148
+ let throttlingDelay = updateStatistics. exponentiallySmoothedUpdateLength * 1.5
144
149
145
150
// Sleeping on a dispatch queue generally isn't a good idea because
146
151
// you prevent the queue from servicing any other work, but in this
0 commit comments