Skip to content

Commit 0c251ef

Browse files
authored
fix(utils/selfDependent): prevent subject from being closed after unsub (#283)
1 parent 6aa83dd commit 0c251ef

File tree

2 files changed

+84
-5
lines changed

2 files changed

+84
-5
lines changed

packages/utils/src/selfDependent.test.ts

Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import {
66
takeWhile,
77
switchMapTo,
88
delay,
9+
startWith,
910
} from "rxjs/operators"
1011
import { TestScheduler } from "rxjs/testing"
1112
import { selfDependent } from "."
@@ -53,4 +54,64 @@ describe("selfDependent", () => {
5354
expectSubscriptions((source as any).subscriptions).toBe(sourceSub)
5455
})
5556
})
57+
58+
it("works after unsubscription and re-subscription", () => {
59+
scheduler().run(({ expectObservable, cold }) => {
60+
const source = cold("abcde")
61+
const sourceSub1 = " ^--!"
62+
const expected1 = " abc"
63+
const sourceSub2 = " -----^---!"
64+
const expected2 = " -----abcd"
65+
66+
const [lastValue$, connect] = selfDependent<string>()
67+
const result$ = source.pipe(
68+
withLatestFrom(lastValue$.pipe(startWith(""))),
69+
map(([v]) => v),
70+
connect(),
71+
)
72+
73+
expectObservable(result$, sourceSub1).toBe(expected1)
74+
expectObservable(result$, sourceSub2).toBe(expected2)
75+
})
76+
})
77+
78+
it("works after complete and re-subscription", () => {
79+
scheduler().run(({ expectObservable, cold }) => {
80+
const source = cold("abc|")
81+
const sourceSub1 = " ^---!"
82+
const expected1 = " abc|"
83+
const sourceSub2 = " -----^---!"
84+
const expected2 = " -----abc|"
85+
86+
const [lastValue$, connect] = selfDependent<string>()
87+
const result$ = source.pipe(
88+
withLatestFrom(lastValue$.pipe(startWith(""))),
89+
map(([v]) => v),
90+
connect(),
91+
)
92+
93+
expectObservable(result$, sourceSub1).toBe(expected1)
94+
expectObservable(result$, sourceSub2).toBe(expected2)
95+
})
96+
})
97+
98+
it("works after error and re-subscription", () => {
99+
scheduler().run(({ expectObservable, cold }) => {
100+
const source = cold("abc#")
101+
const sourceSub1 = " ^---!"
102+
const expected1 = " abc#"
103+
const sourceSub2 = " -----^---!"
104+
const expected2 = " -----abc#"
105+
106+
const [lastValue$, connect] = selfDependent<string>()
107+
const result$ = source.pipe(
108+
withLatestFrom(lastValue$.pipe(startWith(""))),
109+
map(([v]) => v),
110+
connect(),
111+
)
112+
113+
expectObservable(result$, sourceSub1).toBe(expected1)
114+
expectObservable(result$, sourceSub2).toBe(expected2)
115+
})
116+
})
56117
})

packages/utils/src/selfDependent.ts

Lines changed: 23 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,10 @@
1-
import { Observable, Subject, MonoTypeOperatorFunction } from "rxjs"
2-
import { tap } from "rxjs/operators"
1+
import {
2+
Observable,
3+
Subject,
4+
MonoTypeOperatorFunction,
5+
BehaviorSubject,
6+
} from "rxjs"
7+
import { switchAll, tap } from "rxjs/operators"
38

49
/**
510
* A creation operator that helps at creating observables that have circular
@@ -13,10 +18,23 @@ export const selfDependent = <T>(): [
1318
Observable<T>,
1419
() => MonoTypeOperatorFunction<T>,
1520
] => {
16-
const mirrored$ = new Subject<T>()
21+
const activeSubject: BehaviorSubject<Subject<T>> = new BehaviorSubject(
22+
new Subject<T>(),
23+
)
1724
return [
18-
mirrored$.asObservable(),
19-
() => tap(mirrored$) as MonoTypeOperatorFunction<T>,
25+
activeSubject.pipe(switchAll()),
26+
() =>
27+
tap({
28+
next: (v) => activeSubject.value.next(v),
29+
error: (e) => {
30+
activeSubject.value.error(e)
31+
activeSubject.next(new Subject<T>())
32+
},
33+
complete: () => {
34+
activeSubject.value.complete()
35+
activeSubject.next(new Subject<T>())
36+
},
37+
}) as MonoTypeOperatorFunction<T>,
2038
]
2139
}
2240

0 commit comments

Comments
 (0)