Skip to content

Commit c6b4010

Browse files
committed
fix(shareLatest): properly closing sync observables
1 parent 51148d1 commit c6b4010

File tree

2 files changed

+45
-30
lines changed

2 files changed

+45
-30
lines changed

packages/core/src/internal/share-latest.ts

Lines changed: 22 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Observable, Subscription, Subject, noop } from "rxjs"
1+
import { Observable, Subscription, Subject, noop, Subscriber } from "rxjs"
22
import { BehaviorObservable } from "./BehaviorObservable"
33
import { EMPTY_VALUE } from "./empty-value"
44
import { SUSPENSE } from "../SUSPENSE"
@@ -10,7 +10,7 @@ const shareLatest = <T>(
1010
teardown = noop,
1111
): BehaviorObservable<T> => {
1212
let subject: Subject<T> | null
13-
let subscription: Subscription | null
13+
let subscription: Subscriber<T> | null
1414
let refCount = 0
1515
let currentValue: T = EMPTY_VALUE
1616
let promise: Promise<T> | null
@@ -29,15 +29,31 @@ const shareLatest = <T>(
2929

3030
refCount++
3131
let innerSub: Subscription
32+
33+
subscriber.add(() => {
34+
refCount--
35+
innerSub.unsubscribe()
36+
if (refCount === 0) {
37+
currentValue = EMPTY_VALUE
38+
if (subscription) {
39+
subscription.unsubscribe()
40+
}
41+
teardown()
42+
subject = null
43+
subscription = null
44+
promise = null
45+
}
46+
})
47+
3248
if (!subject) {
3349
subject = new Subject<T>()
3450
innerSub = subject.subscribe(subscriber)
3551
subscription = null
36-
subscription = source$.subscribe(
37-
(value) => {
52+
subscription = new Subscriber<T>(
53+
(value: T) => {
3854
subject!.next((currentValue = value))
3955
},
40-
(err) => {
56+
(err: any) => {
4157
const _subject = subject
4258
subscription = null
4359
subject = null
@@ -49,29 +65,14 @@ const shareLatest = <T>(
4965
subject!.complete()
5066
},
5167
)
52-
if (subscription.closed) subscription = null
68+
source$.subscribe(subscription)
5369
emitIfEmpty()
5470
} else {
5571
innerSub = subject.subscribe(subscriber)
5672
if (currentValue !== EMPTY_VALUE) {
5773
subscriber.next(currentValue)
5874
}
5975
}
60-
61-
return () => {
62-
refCount--
63-
innerSub.unsubscribe()
64-
if (refCount === 0) {
65-
currentValue = EMPTY_VALUE
66-
if (subscription) {
67-
subscription.unsubscribe()
68-
}
69-
teardown()
70-
subject = null
71-
subscription = null
72-
promise = null
73-
}
74-
}
7576
}) as BehaviorObservable<T>
7677

7778
let error: any = EMPTY_VALUE

packages/core/src/share-latest.test.ts

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { TestScheduler } from "rxjs/testing"
2-
import { from, merge, defer } from "rxjs"
2+
import { from, merge, defer, Observable, noop } from "rxjs"
33
import { shareLatest } from "./"
4-
import { withLatestFrom, startWith, map } from "rxjs/operators"
4+
import { withLatestFrom, startWith, map, take } from "rxjs/operators"
55

66
const scheduler = () =>
77
new TestScheduler((actual, expected) => {
@@ -75,15 +75,29 @@ describe("shareLatest", () => {
7575

7676
// prettier-ignore
7777
it("should not skip values on a sync source", () => {
78-
scheduler().run(({ expectObservable }) => {
79-
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
80-
const sub1 = '^';
81-
const expected1 = " (abcd|)"
78+
scheduler().run(({ expectObservable }) => {
79+
const source = from(['a', 'b', 'c', 'd']) // cold("(abcd|)")
80+
const sub1 = '^';
81+
const expected1 = " (abcd|)"
8282

83-
const shared = shareLatest()(source);
83+
const shared = shareLatest()(source);
8484

85-
expectObservable(shared, sub1).toBe(expected1);
85+
expectObservable(shared, sub1).toBe(expected1);
86+
})
87+
})
88+
89+
it("should stop listening to a synchronous observable when unsubscribed", () => {
90+
let sideEffects = 0
91+
const synchronousObservable = new Observable<number>((subscriber) => {
92+
// This will check to see if the subscriber was closed on each loop
93+
// when the unsubscribe hits (from the `take`), it should be closed
94+
for (let i = 0; !subscriber.closed && i < 10; i++) {
95+
sideEffects++
96+
subscriber.next(i)
97+
}
98+
})
99+
synchronousObservable.pipe(shareLatest(), take(3)).subscribe(noop)
100+
expect(sideEffects).toBe(3)
86101
})
87-
})
88102
})
89103
})

0 commit comments

Comments
 (0)