|
| 1 | +import Dispatch |
| 2 | + |
1 | 3 | /// A type that produces valueless observations.
|
2 | 4 | public class Publisher {
|
3 | 5 | /// The id for the next observation (ids are used to cancel observations).
|
@@ -55,4 +57,115 @@ public class Publisher {
|
55 | 57 | #endif
|
56 | 58 | return self
|
57 | 59 | }
|
| 60 | + |
| 61 | + // TODO: English this explanation more better. |
| 62 | + /// Observes a publisher for changes and runs an action on the main thread whenever |
| 63 | + /// a change occurs. This pattern generally leads to starvation if events are produced |
| 64 | + /// faster than the serial handler can handle them, so this method deals with that by |
| 65 | + /// ensuring that only one update is allowed to be waiting at any given time. The |
| 66 | + /// implementation guarantees that at least one update will occur after each update, |
| 67 | + /// but if for example 4 updates arrive while a previous update is getting serviced, |
| 68 | + /// then all 4 updates will get serviced by a single run of `closure`. |
| 69 | + /// |
| 70 | + /// Note that some backends don't have a concept of a main thread, so you the updates |
| 71 | + /// won't always end up on a 'main thread', but they are guaranteed to run serially. |
| 72 | + func observeOnMainThreadAvoidingStarvation<Backend: AppBackend>( |
| 73 | + backend: Backend, |
| 74 | + action closure: @escaping () -> Void |
| 75 | + ) -> Cancellable { |
| 76 | + // All of the concurrency related code is there to detect when updates can be merged |
| 77 | + // together (a.k.a. when one of the updates is unnecessary). |
| 78 | + let protectingQueue = DispatchQueue(label: "state update merging") |
| 79 | + let concurrentUpdateHandlingQueue = DispatchQueue( |
| 80 | + label: "concurrent update handling queue", |
| 81 | + attributes: .concurrent |
| 82 | + ) |
| 83 | + let synchronizationSemaphore = DispatchSemaphore(value: 1) |
| 84 | + |
| 85 | + // State shared betwen all calls to the closure defined below. |
| 86 | + var updateIsQueued = false |
| 87 | + var updateIsRunning = false |
| 88 | + var aCurrentJobDidntHaveToWait = false |
| 89 | + |
| 90 | + return observe { |
| 91 | + // I'm sorry if you have to make sense of this... Take my comments as a peace offering. |
| 92 | + |
| 93 | + // Hop to a dispatch queue to avoid blocking any threads in the Swift Structured |
| 94 | + // Concurrency thread pool in the case that the state update originated from a task. |
| 95 | + concurrentUpdateHandlingQueue.async { |
| 96 | + // If no one is running, then we run without waiting, and if someone's running |
| 97 | + // but no one's waiting, then we wait and prevent anyone else from waiting. |
| 98 | + // This ensures that at least one update will always happen after every update |
| 99 | + // received so far, without letting unnecessary updates queue up. The reason |
| 100 | + // that we can merge updates like this is that all state updates are built equal; |
| 101 | + // they don't carry any information other than that they happened. |
| 102 | + var shouldWait = false |
| 103 | + protectingQueue.sync { |
| 104 | + if !updateIsQueued { |
| 105 | + shouldWait = true |
| 106 | + } |
| 107 | + |
| 108 | + if updateIsRunning { |
| 109 | + updateIsQueued = true |
| 110 | + } else { |
| 111 | + updateIsRunning = true |
| 112 | + aCurrentJobDidntHaveToWait = true |
| 113 | + } |
| 114 | + } |
| 115 | + |
| 116 | + guard shouldWait else { |
| 117 | + return |
| 118 | + } |
| 119 | + |
| 120 | + // Waiting just involves attempting to jump to the main thread. |
| 121 | + backend.runInMainThread { |
| 122 | + // This semaphore is used because some backends don't put us on the main |
| 123 | + // thread since they don't have the concept of a single UI thread like |
| 124 | + // macOS does. |
| 125 | + // |
| 126 | + // If `backend.runInMainThread` is truly putting us on the main thread, |
| 127 | + // then this never have to block significantly, otherwise we're just |
| 128 | + // blocking some random thread, so either way we're fine since we've |
| 129 | + // explicitly hopped to a dispatch queue to escape any cooperative |
| 130 | + // Swift Structured Concurrency thread pool the state update may have |
| 131 | + // originated from. |
| 132 | + synchronizationSemaphore.wait() |
| 133 | + |
| 134 | + protectingQueue.sync { |
| 135 | + // If a current job didn't have to wait, then that's us. Due to |
| 136 | + // concurrency that doesn't mean we were the first update triggered. |
| 137 | + // That is, we could've been the job that set `updateIsQueued` to |
| 138 | + // true while still being the job that reached this line first (before |
| 139 | + // the one that set `updateIsRunning` to true). And that's why I've |
| 140 | + // implemented the check in this way with a protected 'global' and not |
| 141 | + // a local variable (being first isn't a property we can know ahead |
| 142 | + // of time). I use 'global' in the sense of shared between all calls |
| 143 | + // to the state update handling closure for a given ViewGraphNode. |
| 144 | + // |
| 145 | + // The reason that `aCurrentJobDidntHaveToWait` is needed at all is |
| 146 | + // so that we can know whether `updateIsQueued`'s value is due to us |
| 147 | + // or someone else/no one. |
| 148 | + if aCurrentJobDidntHaveToWait { |
| 149 | + aCurrentJobDidntHaveToWait = false |
| 150 | + } else { |
| 151 | + updateIsQueued = false |
| 152 | + } |
| 153 | + } |
| 154 | + |
| 155 | + closure() |
| 156 | + |
| 157 | + // If someone is waiting then we leave `updateIsRunning` equal to true |
| 158 | + // because they'll immediately begin running as soon as we exit and we |
| 159 | + // don't want an extra person queueing until they've actually started. |
| 160 | + protectingQueue.sync { |
| 161 | + if !updateIsQueued { |
| 162 | + updateIsRunning = false |
| 163 | + } |
| 164 | + } |
| 165 | + |
| 166 | + synchronizationSemaphore.signal() |
| 167 | + } |
| 168 | + } |
| 169 | + } |
| 170 | + } |
58 | 171 | }
|
0 commit comments