Skip to content

Commit ce8de84

Browse files
authored
Added referenceCounted() for Multicast publishers
- Adds `referenceCounted()` operator for Multicast publishers - Moves `share(replay:)` to use `referenceCounted()` instead of `autoconnect()` – matching wider reactive community implementations.
1 parent 8be24a5 commit ce8de84

File tree

4 files changed

+316
-2
lines changed

4 files changed

+316
-2
lines changed
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
//
2+
// Entwine
3+
// https://github.com/tcldr/Entwine
4+
//
5+
// Copyright © 2019 Tristan Celder. All rights reserved.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
import Combine
26+
27+
// MARK: - Publisher
28+
29+
extension Publishers {
30+
31+
/// Automates the process of connecting to a multicast publisher. Connects when the first
32+
/// subscriber connects then cancels and discards when the subscriber count falls to zero.
33+
public final class ReferenceCounted<Upstream: Publisher, SubjectType: Subject>: Publisher
34+
where Upstream.Output == SubjectType.Output, Upstream.Failure == SubjectType.Failure
35+
{
36+
public typealias Output = Upstream.Output
37+
public typealias Failure = Upstream.Failure
38+
39+
private let upstream: Upstream
40+
private let createSubject: () -> SubjectType
41+
private weak var sharedUpstreamReference: Publishers.Autoconnect<Publishers.Multicast<Upstream, SubjectType>>?
42+
43+
init(upstream: Upstream, createSubject: @escaping () -> SubjectType) {
44+
self.upstream = upstream
45+
self.createSubject = createSubject
46+
}
47+
48+
public func receive<S: Subscriber>(subscriber: S) where Failure == S.Failure, Output == S.Input {
49+
let sharedUpstream = sharedUpstreamPublisher()
50+
sharedUpstream.subscribe(ReferenceCountedSink(upstream: sharedUpstream, downstream: subscriber))
51+
}
52+
53+
func sharedUpstreamPublisher() -> Publishers.Autoconnect<Publishers.Multicast<Upstream, SubjectType>> {
54+
guard let shared = sharedUpstreamReference else {
55+
let shared = upstream.multicast(createSubject).autoconnect()
56+
self.sharedUpstreamReference = shared
57+
return shared
58+
}
59+
return shared
60+
}
61+
}
62+
63+
// MARK: - Sink
64+
65+
fileprivate final class ReferenceCountedSink<Upstream: Publisher, Downstream: Subscriber>: Subscriber
66+
where Upstream.Output == Downstream.Input, Upstream.Failure == Downstream.Failure
67+
{
68+
typealias Input = Downstream.Input
69+
typealias Failure = Downstream.Failure
70+
71+
private let upstream: Upstream
72+
private let downstream: Downstream
73+
74+
init(upstream: Upstream, downstream: Downstream) {
75+
self.upstream = upstream
76+
self.downstream = downstream
77+
}
78+
79+
func receive(subscription: Subscription) {
80+
downstream.receive(subscription: ReferenceCountedSubscription(wrappedSubscription: subscription, sink: self))
81+
}
82+
83+
func receive(_ input: Input) -> Subscribers.Demand {
84+
downstream.receive(input)
85+
}
86+
87+
func receive(completion: Subscribers.Completion<Failure>) {
88+
downstream.receive(completion: completion)
89+
}
90+
}
91+
92+
fileprivate final class ReferenceCountedSubscription<Sink: Subscriber>: Subscription {
93+
94+
let wrappedSubscription: Subscription
95+
var sink: Sink?
96+
97+
init(wrappedSubscription: Subscription, sink: Sink) {
98+
self.wrappedSubscription = wrappedSubscription
99+
self.sink = sink
100+
}
101+
102+
func request(_ demand: Subscribers.Demand) {
103+
wrappedSubscription.request(demand)
104+
}
105+
106+
func cancel() {
107+
wrappedSubscription.cancel()
108+
sink = nil
109+
}
110+
}
111+
}
112+
113+
// MARK: - Operator
114+
115+
extension Publishers.Multicast {
116+
117+
/// Automates the process of connecting to a multicast publisher. Connects when the first
118+
/// subscriber connects then cancels and discards when the subscriber count falls to zero.
119+
///
120+
/// - Returns: A publisher which automatically connects to its upstream multicast publisher.
121+
public func referenceCounted() -> Publishers.ReferenceCounted<Upstream, SubjectType> {
122+
.init(upstream: upstream, createSubject: createSubject)
123+
}
124+
}

Sources/Entwine/Operators/ShareReplay.swift

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ extension Publisher {
3636
/// replay to new subscribers
3737
/// - Returns: A class instance that republishes its upstream publisher and maintains a
3838
/// buffer of its latest values for replay to new subscribers
39-
public func share(replay maxBufferSize: Int) -> Publishers.Autoconnect<Publishers.Multicast<Self, ReplaySubject<Output, Failure>>> {
40-
multicast(subject: ReplaySubject<Output, Failure>(maxBufferSize: maxBufferSize)).autoconnect()
39+
public func share(replay maxBufferSize: Int) -> Publishers.ReferenceCounted<Self, ReplaySubject<Self.Output, Self.Failure>> {
40+
multicast { ReplaySubject<Output, Failure>(maxBufferSize: maxBufferSize) }.referenceCounted()
4141
}
4242
}
Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
//
2+
// Entwine
3+
// https://github.com/tcldr/Entwine
4+
//
5+
// Copyright © 2019 Tristan Celder. All rights reserved.
6+
//
7+
// Permission is hereby granted, free of charge, to any person obtaining a copy
8+
// of this software and associated documentation files (the "Software"), to deal
9+
// in the Software without restriction, including without limitation the rights
10+
// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
// copies of the Software, and to permit persons to whom the Software is
12+
// furnished to do so, subject to the following conditions:
13+
//
14+
// The above copyright notice and this permission notice shall be included in
15+
// all copies or substantial portions of the Software.
16+
//
17+
// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
23+
// THE SOFTWARE.
24+
25+
import XCTest
26+
import Combine
27+
28+
@testable import Entwine
29+
@testable import EntwineTest
30+
31+
final class ReferenceCountedTests: XCTestCase {
32+
33+
// MARK: - Properties
34+
35+
private var scheduler: TestScheduler!
36+
37+
// MARK: - Per test set-up and tear-down
38+
39+
override func setUp() {
40+
scheduler = TestScheduler(initialClock: 0)
41+
}
42+
43+
// MARK: - Tests
44+
45+
func testAutoConnectsAndPassesThroughInitialValue() {
46+
47+
let passthrough = PassthroughSubject<Int, Never>()
48+
let subject = passthrough.prepend(-1).share()
49+
50+
let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
51+
52+
scheduler.schedule(after: 100) { subject.subscribe(results1) }
53+
scheduler.schedule(after: 200) { passthrough.send(0) }
54+
scheduler.schedule(after: 210) { passthrough.send(1) }
55+
56+
scheduler.resume()
57+
58+
let expected: TestSequence<Int, Never> = [
59+
(100, .subscription),
60+
(100, .input(-1)),
61+
(200, .input( 0)),
62+
(210, .input( 1)),
63+
]
64+
65+
XCTAssertEqual(expected, results1.recordedOutput)
66+
}
67+
68+
func testPassesThroughInitialValueToFirstSubscriberOnly() {
69+
70+
let passthrough = PassthroughSubject<Int, Never>()
71+
let subject = passthrough.prepend(-1).share()
72+
73+
let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
74+
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)
75+
76+
scheduler.schedule(after: 100) { subject.subscribe(results1) }
77+
scheduler.schedule(after: 110) { subject.subscribe(results2) }
78+
scheduler.schedule(after: 200) { passthrough.send(0) }
79+
scheduler.schedule(after: 210) { passthrough.send(1) }
80+
81+
scheduler.resume()
82+
83+
let expected2: TestSequence<Int, Never> = [
84+
(110, .subscription),
85+
(200, .input( 0)),
86+
(210, .input( 1)),
87+
]
88+
89+
XCTAssertEqual(expected2, results2.recordedOutput)
90+
}
91+
92+
func testResetsWhenReferenceCountReachesZero() {
93+
94+
let passthrough = PassthroughSubject<Int, Never>()
95+
let subject = passthrough.prepend(-1).share()
96+
97+
let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
98+
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)
99+
let results3 = scheduler.createTestableSubscriber(Int.self, Never.self)
100+
101+
scheduler.schedule(after: 100) { subject.subscribe(results1) }
102+
scheduler.schedule(after: 110) { subject.subscribe(results2) }
103+
scheduler.schedule(after: 200) { passthrough.send(0) }
104+
scheduler.schedule(after: 210) { passthrough.send(1) }
105+
scheduler.schedule(after: 300) { results1.cancel() }
106+
scheduler.schedule(after: 310) { results2.cancel() }
107+
scheduler.schedule(after: 400) { subject.subscribe(results3) }
108+
scheduler.schedule(after: 500) { passthrough.send(0) }
109+
scheduler.schedule(after: 510) { passthrough.send(1) }
110+
111+
scheduler.resume()
112+
113+
let expected3: TestSequence<Int, Never> = [
114+
(400, .subscription),
115+
(400, .input(-1)),
116+
(500, .input( 0)),
117+
(510, .input( 1)),
118+
]
119+
120+
XCTAssertEqual(expected3, results3.recordedOutput)
121+
}
122+
123+
func testMulticastCreateSubjectCalledWhenSubscriberCountGoesFromZeroToOne() {
124+
125+
var cancellables = Set<AnyCancellable>()
126+
let factory: () -> PassthroughSubject<Int, Never> = { Swift.print("createSubject()"); return PassthroughSubject() }
127+
let sut = Just(1)
128+
sut.multicast(factory).autoconnect().sink { print("A:\($0)") }.store(in: &cancellables)
129+
cancellables = Set<AnyCancellable>()
130+
sut.multicast(factory).autoconnect().sink { print("B:\($0)") }.store(in: &cancellables)
131+
}
132+
}

Tests/EntwineTests/ShareReplayTests.swift

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,4 +229,62 @@ final class ShareReplayTests: XCTestCase {
229229

230230
XCTAssertEqual(expected2, results2.recordedOutput)
231231
}
232+
233+
func testPassesThroughInitialValueToFirstSubscriberOnly() {
234+
235+
let passthrough = PassthroughSubject<Int, Never>()
236+
let subject = passthrough.prepend(-1).share()
237+
238+
let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
239+
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)
240+
241+
scheduler.schedule(after: 100) { subject.subscribe(results1) }
242+
scheduler.schedule(after: 110) { subject.subscribe(results2) }
243+
scheduler.schedule(after: 200) { passthrough.send(0) }
244+
scheduler.schedule(after: 210) { passthrough.send(1) }
245+
246+
scheduler.resume()
247+
248+
let expected2: TestSequence<Int, Never> = [
249+
(110, .subscription),
250+
(200, .input( 0)),
251+
(210, .input( 1)),
252+
]
253+
254+
XCTAssertEqual(expected2, results2.recordedOutput)
255+
}
256+
257+
func testResetsWhenReferenceCountReachesZero() {
258+
259+
let passthrough = PassthroughSubject<Int, Never>()
260+
let subject = passthrough.prepend(-1).share(replay: 2)
261+
262+
let results1 = scheduler.createTestableSubscriber(Int.self, Never.self)
263+
let results2 = scheduler.createTestableSubscriber(Int.self, Never.self)
264+
265+
scheduler.schedule(after: 100) { subject.subscribe(results1) }
266+
scheduler.schedule(after: 110) { passthrough.send(0) }
267+
scheduler.schedule(after: 200) { results1.cancel() }
268+
scheduler.schedule(after: 200) { subject.subscribe(results2) }
269+
scheduler.schedule(after: 300) { passthrough.send(5) }
270+
scheduler.schedule(after: 310) { passthrough.send(6) }
271+
272+
scheduler.resume()
273+
274+
let expected1: TestSequence<Int, Never> = [
275+
(100, .subscription),
276+
(100, .input(-1)),
277+
(110, .input( 0)),
278+
]
279+
280+
let expected2: TestSequence<Int, Never> = [
281+
(200, .subscription),
282+
(200, .input(-1)),
283+
(300, .input( 5)),
284+
(310, .input( 6)),
285+
]
286+
287+
XCTAssertEqual(expected1, results1.recordedOutput)
288+
XCTAssertEqual(expected2, results2.recordedOutput)
289+
}
232290
}

0 commit comments

Comments
 (0)