Skip to content

Commit d2ab63e

Browse files
author
pjechris
authored
tech(observer): Use registry to observe arrays (#57)
1 parent cacb619 commit d2ab63e

File tree

8 files changed

+200
-77
lines changed

8 files changed

+200
-77
lines changed

Sources/CohesionKit/Identity/IdentityStore.swift

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ public class IdentityMap {
1414
private lazy var storeVisitor = IdentityMapStoreVisitor(identityMap: self)
1515

1616
/// Create a new IdentityMap instance optionally with a queue and a logger
17-
/// - Parameter queue: the queue on which to receive updates. If nil identitymap will create its own. DO NOT USE
18-
/// main thread as you may end up with data races
17+
/// - Parameter queue: the queue on which to receive updates. If nil identitymap will create its own.
1918
/// - Parameter logger: a logger to follow/debug identity internal state
2019
public convenience init(queue: DispatchQueue? = nil, logger: Logger? = nil) {
2120
self.init(registry: ObserverRegistry(queue: queue), logger: logger)
@@ -92,7 +91,7 @@ public class IdentityMap {
9291

9392
/// Store multiple entities at once
9493
public func store<C: Collection>(entities: C, named: AliasKey<C>? = nil, modifiedAt: Stamp? = nil)
95-
-> [EntityObserver<C.Element>] where C.Element: Identifiable {
94+
-> EntityObserver<[C.Element]> where C.Element: Identifiable {
9695
transaction {
9796
let nodes = entities.map { nodeStore(entity: $0, modifiedAt: modifiedAt) }
9897

@@ -101,13 +100,13 @@ public class IdentityMap {
101100
logger?.didRegisterAlias(alias)
102101
}
103102

104-
return nodes.map { EntityObserver(node: $0, registry: registry) }
103+
return EntityObserver(nodes: nodes, registry: registry)
105104
}
106105
}
107106

108107
/// store multiple aggregates at once
109108
public func store<C: Collection>(entities: C, named: AliasKey<C>? = nil, modifiedAt: Stamp? = nil)
110-
-> [EntityObserver<C.Element>] where C.Element: Aggregate {
109+
-> EntityObserver<[C.Element]> where C.Element: Aggregate {
111110
transaction {
112111
let nodes = entities.map { nodeStore(entity: $0, modifiedAt: modifiedAt) }
113112

@@ -116,7 +115,7 @@ public class IdentityMap {
116115
logger?.didRegisterAlias(alias)
117116
}
118117

119-
return nodes.map { EntityObserver(node: $0, registry: registry) }
118+
return EntityObserver(nodes: nodes, registry: registry)
120119
}
121120
}
122121

Sources/CohesionKit/Observer/AliasObserver.swift

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -38,16 +38,15 @@ extension AliasObserver {
3838
registry: ObserverRegistry,
3939
onChange: @escaping OnChangeClosure
4040
) -> Subscription {
41+
// register for current alias value
4142
var entityChangesSubscription: Subscription? = alias
4243
.value
43-
.map { node in registry.addObserver(node: node, onChange: onChange) }
44+
.map { node in registry.addObserver(node: node, initial: true, onChange: onChange) }
4445

4546
// subscribe to alias changes
4647
let subscription = alias.addObserver { node in
4748
// update entity changes subscription
48-
entityChangesSubscription = node.map { registry.addObserver(node: $0) { onChange($0) }}
49-
50-
registry.queue.async { onChange(node?.ref.value) }
49+
entityChangesSubscription = node.map { registry.addObserver(node: $0, initial: true, onChange: onChange) }
5150
}
5251

5352
return Subscription {
@@ -64,16 +63,15 @@ extension AliasObserver {
6463
registry: ObserverRegistry,
6564
onChange: @escaping OnChangeClosure
6665
) -> Subscription where T == Array<E> {
66+
// register for current alias value
6767
var entitiesChangesSubscriptions: Subscription? = alias
6868
.value
69-
.map { nodes in nodes.map { EntityObserver(node: $0, registry: registry) } }?
69+
.map { nodes in EntityObserver(nodes: nodes, registry: registry) }?
7070
.observe(onChange: onChange)
7171

7272
// Subscribe to alias ref changes and to any changes made on the ref collection nodes.
7373
let subscription = alias.addObserver { nodes in
74-
let nodeObservers = nodes?.map { EntityObserver(node: $0, registry: registry) }
75-
76-
registry.queue.async { onChange(nodeObservers?.value) }
74+
let nodeObservers = nodes.map { EntityObserver(nodes: $0, registry: registry) }
7775

7876
// update collection changes subscription
7977
entitiesChangesSubscriptions = nodeObservers?.observe(onChange: onChange)
@@ -83,6 +81,5 @@ extension AliasObserver {
8381
subscription.unsubscribe()
8482
entitiesChangesSubscriptions?.unsubscribe()
8583
}
86-
}
87-
84+
}
8885
}

Sources/CohesionKit/Observer/EntityObserver.swift

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,17 +2,27 @@ import Foundation
22

33
/// A type registering observers on a given entity from identity storage
44
public struct EntityObserver<T>: Observer {
5-
let node: EntityNode<T>
6-
let registry: ObserverRegistry
5+
public typealias OnChange = (T) -> Void
6+
77
public let value: T
88

9+
let createObserver: (@escaping OnChange) -> Subscription
10+
911
init(node: EntityNode<T>, registry: ObserverRegistry) {
10-
self.registry = registry
11-
self.node = node
12-
self.value = node.value as! T
12+
self.value = node.ref.value
13+
self.createObserver = { onChange in
14+
registry.addObserver(node: node, initial: true, onChange: onChange)
15+
}
16+
}
17+
18+
init<Element>(nodes: [EntityNode<Element>], registry: ObserverRegistry) where T == [Element] {
19+
self.value = nodes.map(\.ref.value)
20+
self.createObserver = { onChange in
21+
registry.addObserver(nodes: nodes, initial: true, onChange: onChange)
22+
}
1323
}
1424

15-
public func observe(onChange: @escaping (T) -> Void) -> Subscription {
16-
registry.addObserver(node: node, initial: true, onChange: onChange)
25+
public func observe(onChange: @escaping OnChange) -> Subscription {
26+
createObserver(onChange)
1727
}
1828
}
Lines changed: 2 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,34 +1,15 @@
11
/// A protocol allowing to observe a value returned from the `IdentityMap`
22
public protocol Observer {
33
associatedtype T
4-
4+
55
/// The value at the time the observer creation.
66
/// If you want **realtime** value use `observe to get notified of changes
77
var value: T { get }
8-
8+
99
/// Add an observer being notified when entity change.
1010
/// Alternatively you can use `asPublisher` to observe using Combine.
1111
/// - Parameter onChange: a closure called when value changed
1212
/// - Returns: a subscription to cancel observation. Observation is automatically cancelled if subscription is deinit.
1313
/// As long as the subscription is alived the entity should be kept in `IdentityMap`.
1414
func observe(onChange: @escaping (T) -> Void) -> Subscription
1515
}
16-
17-
extension Array: Observer where Element: Observer {
18-
public var value: [Element.T] { map(\.value) }
19-
20-
public func observe(onChange: @escaping ([Element.T]) -> Void) -> Subscription {
21-
var value = value
22-
23-
let subscriptions = indices.map { index in
24-
self[index].observe {
25-
value[index] = $0
26-
onChange(value)
27-
}
28-
}
29-
30-
return Subscription {
31-
subscriptions.forEach { $0.unsubscribe() }
32-
}
33-
}
34-
}

Sources/CohesionKit/Observer/ObserverRegistry.swift

Lines changed: 76 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,49 +3,60 @@ import Foundation
33
/// Registers observers associated to an ``EntityNode``.
44
/// The registry will handle notifying observers when a node is marked as changed
55
class ObserverRegistry {
6-
typealias Observer = (Any) -> Void
7-
private typealias ObserverID = Int
86
private typealias Hash = Int
97

108
let queue: DispatchQueue
11-
/// registered observers
12-
private var observers: [Hash: [ObserverID: Observer]] = [:]
13-
/// next available id for an observer
14-
private var nextObserverID: ObserverID = 0
9+
/// registered observer handlers
10+
private var handlers: [Hash: Set<Handler>] = [:]
1511
/// nodes waiting for notifiying their observes about changes
1612
private var pendingChanges: [Hash: AnyWeak] = [:]
1713

1814
init(queue: DispatchQueue? = nil) {
19-
self.queue = queue ?? DispatchQueue(label: "com.cohesionkit.registry")
15+
self.queue = queue ?? DispatchQueue.main
2016
}
2117

2218
/// register an observer to observe changes on an entity node. Everytime `ObserverRegistry` is notified about changes
2319
/// to this node `onChange` will be called.
2420
func addObserver<T>(node: EntityNode<T>, initial: Bool = false, onChange: @escaping (T) -> Void) -> Subscription {
25-
let observerID = generateID()
21+
let handler = Handler { onChange($0.ref.value) }
2622

27-
observers[node.hashValue, default: [:]][observerID] = {
28-
guard let newValue = $0 as? EntityNode<T> else {
29-
return
23+
if initial {
24+
if queue == DispatchQueue.main && Thread.isMainThread {
25+
onChange(node.ref.value)
26+
}
27+
else {
28+
queue.sync {
29+
onChange(node.ref.value)
3030
}
31+
}
32+
}
3133

32-
onChange(newValue.ref.value)
34+
return subscribeHandler(handler, for: node)
35+
}
36+
37+
/// Add an observer handler to multiple nodes.
38+
/// Note that the same handler will be added to each nodes. But it should get notified only once per transaction
39+
func addObserver<T>(nodes: [EntityNode<T>], initial: Bool = false, onChange: @escaping ([T]) -> Void) -> Subscription {
40+
let handler = Handler { (_: EntityNode<T>) in
41+
// use last value from nodes
42+
onChange(nodes.map(\.ref.value))
3343
}
3444

3545
if initial {
3646
if queue == DispatchQueue.main && Thread.isMainThread {
37-
onChange(node.ref.value)
47+
onChange(nodes.map(\.ref.value))
3848
}
3949
else {
4050
queue.sync {
41-
onChange(node.ref.value)
51+
onChange(nodes.map(\.ref.value))
4252
}
4353
}
4454
}
4555

46-
// subscription keeps a strong ref to node, avoiding it from being released somehow while suscription is running
47-
return Subscription { [node] in
48-
self.observers[node.hashValue]?.removeValue(forKey: observerID)
56+
let subscriptions = nodes.map { node in subscribeHandler(handler, for: node) }
57+
58+
return Subscription {
59+
subscriptions.forEach { $0.unsubscribe() }
4960
}
5061
}
5162

@@ -61,26 +72,67 @@ class ObserverRegistry {
6172
/// Notify observers of all queued changes. Once notified pending changes are cleared out.
6273
func postChanges() {
6374
let changes = pendingChanges
64-
let observers = self.observers
75+
let handlers = self.handlers
76+
var executedHandlers: Set<Handler> = []
6577

6678
self.pendingChanges = [:]
6779

68-
queue.async { [unowned self] in
80+
queue.async {
6981
for (hashKey, weakNode) in changes {
7082
// node was released: no one to notify
7183
guard let node = weakNode.unwrap() else {
7284
continue
7385
}
7486

75-
observers[hashKey]?.forEach { (_, observer) in
76-
observer(node)
87+
for handler in handlers[hashKey] ?? [] {
88+
// if some handlers are used multiple times (like for collections), make sure we don't notify them multiple times
89+
guard !executedHandlers.contains(handler) else {
90+
continue
91+
}
92+
93+
handler(node)
94+
executedHandlers.insert(handler)
7795
}
7896
}
7997
}
8098
}
8199

82-
private func generateID() -> ObserverID {
83-
defer { nextObserverID &+= 1 }
84-
return nextObserverID
100+
private func subscribeHandler<T>(_ handler: Handler, for node: EntityNode<T>) -> Subscription {
101+
handlers[node.hashValue, default: []].insert(handler)
102+
103+
// subscription keeps a strong ref to node, avoiding it from being released somehow while suscription is running
104+
return Subscription { [node] in
105+
self.handlers[node.hashValue]?.remove(handler)
106+
}
85107
}
86108
}
109+
110+
extension ObserverRegistry {
111+
/// Handle observation for a given node
112+
class Handler: Hashable {
113+
let executor: (Any) -> Void
114+
115+
init<T>(executor: @escaping (EntityNode<T>) -> Void) {
116+
self.executor = {
117+
guard let entity = $0 as? EntityNode<T> else {
118+
return
119+
}
120+
121+
executor(entity)
122+
}
123+
}
124+
125+
/// execute the handler if `executeAtMost` does not exceed `executeCount`
126+
func callAsFunction(_ value: Any) {
127+
executor(value)
128+
}
129+
130+
static func == (lhs: Handler, rhs: Handler) -> Bool {
131+
ObjectIdentifier(lhs) == ObjectIdentifier(rhs)
132+
}
133+
134+
func hash(into hasher: inout Hasher) {
135+
hasher.combine(ObjectIdentifier(self))
136+
}
137+
}
138+
}

Tests/CohesionKitTests/IdentityMapTests.swift

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -268,10 +268,16 @@ extension IdentityMapTests {
268268
let identityMap = IdentityMap(queue: .main)
269269
let newEntity = SingleNodeFixture(id: 2)
270270
let expectation = XCTestExpectation()
271+
var firstDropped = false
271272

272273
_ = identityMap.store(entity: SingleNodeFixture(id: 1), named: .test, modifiedAt: 0)
273274

274275
let subscription = identityMap.find(named: .test).observe {
276+
guard firstDropped else {
277+
firstDropped = true
278+
return
279+
}
280+
275281
expectation.fulfill()
276282
XCTAssertEqual($0, newEntity)
277283
}
@@ -289,10 +295,16 @@ extension IdentityMapTests {
289295
let identityMap = IdentityMap(queue: .main)
290296
let entities = [SingleNodeFixture(id: 1)]
291297
let expectation = XCTestExpectation()
298+
var firstDropped = false
292299

293300
_ = identityMap.store(entities: [], named: .listOfNodes, modifiedAt: 0)
294301

295302
let subscription = identityMap.find(named: .listOfNodes).observe {
303+
guard firstDropped else {
304+
firstDropped = true
305+
return
306+
}
307+
296308
expectation.fulfill()
297309
XCTAssertEqual($0, entities)
298310
}
@@ -313,4 +325,4 @@ private extension AliasKey where T == SingleNodeFixture {
313325

314326
private extension AliasKey where T == [SingleNodeFixture] {
315327
static let listOfNodes = AliasKey(named: "listOfNodes")
316-
}
328+
}

0 commit comments

Comments
 (0)