Skip to content

Commit 6fe91f8

Browse files
volivajosepot
andauthored
feat: utils toKeySet (#252)
* feat: utils `toKeySet` * Update packages/utils/src/toKeySet.ts Co-authored-by: Josep M Sobrepere <jm.sobrepere@gmail.com> * Update packages/utils/src/toKeySet.test.ts Co-authored-by: Josep M Sobrepere <jm.sobrepere@gmail.com> * organize test imports Co-authored-by: Josep M Sobrepere <jm.sobrepere@gmail.com>
1 parent ec8f8ad commit 6fe91f8

File tree

3 files changed

+127
-0
lines changed

3 files changed

+127
-0
lines changed

packages/utils/src/index.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ export { createSignal } from "./createSignal"
33
export { createKeyedSignal } from "./createKeyedSignal"
44
export { mergeWithKey } from "./mergeWithKey"
55
export { partitionByKey, KeyChanges } from "./partitionByKey"
6+
export { toKeySet } from "./toKeySet"
67
export { suspend } from "./suspend"
78
export { suspended } from "./suspended"
89
export { switchMapSuspended } from "./switchMapSuspended"

packages/utils/src/toKeySet.test.ts

Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
import { asapScheduler, map, observeOn, of, Subject } from "rxjs"
2+
import { TestScheduler } from "rxjs/testing"
3+
import { KeyChanges, toKeySet } from "./"
4+
5+
const scheduler = () =>
6+
new TestScheduler((actual, expected) => {
7+
expect(actual).toEqual(expected)
8+
})
9+
10+
describe("toKeySet", () => {
11+
it("transforms key changes to a Set", () => {
12+
scheduler().run(({ expectObservable, cold }) => {
13+
const expectedStr = " xe--f-g--h#"
14+
const source$ = cold<KeyChanges<string>>("-a--b-c--d#", {
15+
a: {
16+
type: "add",
17+
keys: ["a", "b"],
18+
},
19+
b: {
20+
type: "remove",
21+
keys: ["b", "c"],
22+
},
23+
c: {
24+
type: "add",
25+
keys: ["c"],
26+
},
27+
d: {
28+
type: "remove",
29+
keys: ["a"],
30+
},
31+
})
32+
33+
const result$ = source$.pipe(
34+
toKeySet(),
35+
map((s) => Array.from(s)),
36+
)
37+
38+
expectObservable(result$).toBe(expectedStr, {
39+
x: [],
40+
e: ["a", "b"],
41+
f: ["a"],
42+
g: ["a", "c"],
43+
h: ["c"],
44+
})
45+
})
46+
})
47+
48+
it("emits synchronously on the first subscribe if it receives a synchronous change", () => {
49+
const emissions: string[][] = []
50+
of<KeyChanges<string>>({
51+
type: "add",
52+
keys: ["a", "b"],
53+
})
54+
.pipe(toKeySet())
55+
.subscribe((next) => emissions.push(Array.from(next)))
56+
57+
expect(emissions.length).toBe(1)
58+
expect(emissions[0]).toEqual(["a", "b"])
59+
})
60+
61+
it("emits synchronously an empty Set if it doesn't receive a synchronous change", () => {
62+
const emissions: string[][] = []
63+
of<KeyChanges<string>>({
64+
type: "add",
65+
keys: ["a", "b"],
66+
})
67+
.pipe(observeOn(asapScheduler), toKeySet())
68+
.subscribe((next) => emissions.push(Array.from(next)))
69+
70+
expect(emissions.length).toBe(1)
71+
expect(emissions[0]).toEqual([])
72+
})
73+
74+
it("resets the Set after unsubscribing", () => {
75+
const input$ = new Subject<KeyChanges<string>>()
76+
const result$ = input$.pipe(toKeySet())
77+
78+
let emissions: string[][] = []
79+
let sub = result$.subscribe((v) => emissions.push(Array.from(v)))
80+
input$.next({
81+
type: "add",
82+
keys: ["a"],
83+
})
84+
expect(emissions.length).toBe(2) // [0] is initial empty []
85+
expect(emissions[1]).toEqual(["a"])
86+
sub.unsubscribe()
87+
88+
emissions = []
89+
sub = result$.subscribe((v) => emissions.push(Array.from(v)))
90+
input$.next({
91+
type: "add",
92+
keys: ["b"],
93+
})
94+
expect(emissions.length).toBe(2) // [0] is initial empty []
95+
expect(emissions[1]).toEqual(["b"])
96+
sub.unsubscribe()
97+
})
98+
})

packages/utils/src/toKeySet.ts

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
import { Observable, OperatorFunction } from "rxjs"
2+
import { KeyChanges } from "./partitionByKey"
3+
4+
export function toKeySet<K>(): OperatorFunction<KeyChanges<K>, Set<K>> {
5+
return (source$) =>
6+
new Observable<Set<K>>((observer) => {
7+
const result = new Set<K>()
8+
let pristine = true
9+
const subscription = source$.subscribe({
10+
next({ type, keys }) {
11+
const action = type === "add" ? type : "delete"
12+
for (let k of keys) {
13+
result[action](k)
14+
}
15+
observer.next(result)
16+
pristine = false
17+
},
18+
error(e) {
19+
observer.error(e)
20+
},
21+
complete() {
22+
observer.complete()
23+
},
24+
})
25+
if (pristine) observer.next(result)
26+
return subscription
27+
})
28+
}

0 commit comments

Comments
 (0)