|
1 |
| -import { NEVER, Observable, Subject } from "rxjs" |
2 |
| -import { switchMap, take } from "rxjs/operators" |
| 1 | +import { concat, NEVER, Observable, of, Subject } from "rxjs" |
| 2 | +import { catchError, switchMap, take } from "rxjs/operators" |
3 | 3 | import { TestScheduler } from "rxjs/testing"
|
4 | 4 | import { partitionByKey } from "./"
|
5 | 5 |
|
@@ -58,6 +58,33 @@ describe("partitionByKey", () => {
|
58 | 58 | })
|
59 | 59 | })
|
60 | 60 | })
|
| 61 | + |
| 62 | + it("keeps the existing keys alive when the source completes", () => { |
| 63 | + scheduler().run(({ expectObservable, cold }) => { |
| 64 | + const source = cold("-ab-|") |
| 65 | + const a = cold(" --1---2-|") |
| 66 | + const b = cold(" ---|") |
| 67 | + const expectedStr = "efg--h---(i|)" |
| 68 | + const innerStreams = { a, b } |
| 69 | + const [, result] = partitionByKey( |
| 70 | + source, |
| 71 | + (v) => v, |
| 72 | + (v$) => |
| 73 | + v$.pipe( |
| 74 | + take(1), |
| 75 | + switchMap((v) => innerStreams[v]), |
| 76 | + ), |
| 77 | + ) |
| 78 | + |
| 79 | + expectObservable(result).toBe(expectedStr, { |
| 80 | + e: [], |
| 81 | + f: ["a"], |
| 82 | + g: ["a", "b"], |
| 83 | + h: ["a"], |
| 84 | + i: [], |
| 85 | + }) |
| 86 | + }) |
| 87 | + }) |
61 | 88 | })
|
62 | 89 |
|
63 | 90 | describe("getInstance$", () => {
|
@@ -112,5 +139,95 @@ describe("partitionByKey", () => {
|
112 | 139 | expect(lateNext).toHaveBeenCalledTimes(1)
|
113 | 140 | expect(lateNext).toHaveBeenCalledWith(1)
|
114 | 141 | })
|
| 142 | + |
| 143 | + it("unsubscribes from all streams when refcount reaches 0", () => { |
| 144 | + let innerSubs = 0 |
| 145 | + const inner = new Observable<number>(() => { |
| 146 | + innerSubs++ |
| 147 | + return () => { |
| 148 | + innerSubs-- |
| 149 | + } |
| 150 | + }) |
| 151 | + |
| 152 | + const sourceSubject = new Subject<number>() |
| 153 | + let sourceSubs = 0 |
| 154 | + const source = new Observable<number>((obs) => { |
| 155 | + sourceSubs++ |
| 156 | + sourceSubject.subscribe(obs) |
| 157 | + return () => { |
| 158 | + sourceSubs-- |
| 159 | + } |
| 160 | + }) |
| 161 | + |
| 162 | + const [getObs] = partitionByKey( |
| 163 | + source, |
| 164 | + (v) => v, |
| 165 | + () => inner, |
| 166 | + ) |
| 167 | + const observable = getObs(1) |
| 168 | + |
| 169 | + expect(sourceSubs).toBe(0) |
| 170 | + expect(innerSubs).toBe(0) |
| 171 | + |
| 172 | + const sub1 = observable.subscribe() |
| 173 | + |
| 174 | + expect(sourceSubs).toBe(1) |
| 175 | + expect(innerSubs).toBe(0) |
| 176 | + |
| 177 | + sourceSubject.next(1) |
| 178 | + |
| 179 | + expect(sourceSubs).toBe(1) |
| 180 | + expect(innerSubs).toBe(1) |
| 181 | + |
| 182 | + const sub2 = observable.subscribe() |
| 183 | + |
| 184 | + expect(sourceSubs).toBe(1) |
| 185 | + expect(innerSubs).toBe(1) |
| 186 | + |
| 187 | + sub1.unsubscribe() |
| 188 | + |
| 189 | + expect(sourceSubs).toBe(1) |
| 190 | + expect(innerSubs).toBe(1) |
| 191 | + |
| 192 | + sub2.unsubscribe() |
| 193 | + |
| 194 | + expect(sourceSubs).toBe(0) |
| 195 | + expect(innerSubs).toBe(0) |
| 196 | + }) |
| 197 | + |
| 198 | + it("emits a complete on the inner observable when the source completes", () => { |
| 199 | + scheduler().run(({ expectObservable, cold }) => { |
| 200 | + const source = cold("-ab-a-|") |
| 201 | + const expectA = " -a--a-(c|)" |
| 202 | + const expectB = " --b---(c|)" |
| 203 | + |
| 204 | + const [getInstance$] = partitionByKey( |
| 205 | + source, |
| 206 | + (v) => v, |
| 207 | + (v$) => concat(v$, ["c"]), |
| 208 | + ) |
| 209 | + |
| 210 | + expectObservable(getInstance$("a")).toBe(expectA) |
| 211 | + expectObservable(getInstance$("b")).toBe(expectB) |
| 212 | + }) |
| 213 | + }) |
| 214 | + |
| 215 | + // Do we want this behaviour? |
| 216 | + it("emits an error when the source errors", () => { |
| 217 | + scheduler().run(({ expectObservable, cold }) => { |
| 218 | + const source = cold("-ab-a-#") |
| 219 | + const expectA = " -a--a-#" |
| 220 | + const expectB = " --b---#" |
| 221 | + |
| 222 | + const [getInstance$] = partitionByKey( |
| 223 | + source, |
| 224 | + (v) => v, |
| 225 | + (v$) => v$.pipe(catchError(() => of("e"))), |
| 226 | + ) |
| 227 | + |
| 228 | + expectObservable(getInstance$("a")).toBe(expectA) |
| 229 | + expectObservable(getInstance$("b")).toBe(expectB) |
| 230 | + }) |
| 231 | + }) |
115 | 232 | })
|
116 | 233 | })
|
0 commit comments