Skip to content

Commit 9e33084

Browse files
volivajosepot
authored andcommitted
reimplement partitionByKey
1 parent 71173d3 commit 9e33084

File tree

2 files changed

+191
-9
lines changed

2 files changed

+191
-9
lines changed

packages/utils/src/partitionByKey.test.ts

Lines changed: 99 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ describe("partitionByKey", () => {
1212
describe("activeKeys$", () => {
1313
it("emits a list with all the active keys", () => {
1414
scheduler().run(({ expectObservable, cold }) => {
15-
const source = cold("-ab---cd---")
15+
const source = cold("-ab-a-cd---")
1616
const expectedStr = "efg---hi---"
1717
const [, result] = partitionByKey(
1818
source,
@@ -30,14 +30,31 @@ describe("partitionByKey", () => {
3030
})
3131
})
3232

33+
it("emits all the synchronous groups in a single emission", () => {
34+
scheduler().run(({ expectObservable, cold }) => {
35+
const source = concat(of("a", "b"), cold("--c--"))
36+
const expectedStr = " g-h--"
37+
const [, result] = partitionByKey(
38+
source,
39+
(v) => v,
40+
() => NEVER,
41+
)
42+
43+
expectObservable(result).toBe(expectedStr, {
44+
g: ["a", "b"],
45+
h: ["a", "b", "c"],
46+
})
47+
})
48+
})
49+
3350
it("removes a key from the list when its inner stream completes", () => {
3451
scheduler().run(({ expectObservable, cold }) => {
3552
const source = cold("-ab---c--")
3653
const a = cold(" --1---2-")
3754
const b = cold(" ---|")
3855
const c = cold(" 1-|")
3956
const expectedStr = "efg--hi-j"
40-
const innerStreams: Record<string, Observable<any>> = { a, b, c }
57+
const innerStreams: Record<string, Observable<string>> = { a, b, c }
4158
const [, result] = partitionByKey(
4259
source,
4360
(v) => v,
@@ -65,7 +82,7 @@ describe("partitionByKey", () => {
6582
const a = cold(" --1---2-|")
6683
const b = cold(" ---|")
6784
const expectedStr = "efg--h---(i|)"
68-
const innerStreams = { a, b }
85+
const innerStreams: Record<string, Observable<string>> = { a, b }
6986
const [, result] = partitionByKey(
7087
source,
7188
(v) => v,
@@ -85,6 +102,84 @@ describe("partitionByKey", () => {
85102
})
86103
})
87104
})
105+
106+
it("completes when no key is alive and the source completes", () => {
107+
scheduler().run(({ expectObservable, cold }) => {
108+
const source = cold("-ab---|")
109+
const a = cold(" --1|")
110+
const b = cold(" ---|")
111+
const expectedStr = "efg-hi|"
112+
const innerStreams: Record<string, Observable<string>> = { a, b }
113+
const [, result] = partitionByKey(
114+
source,
115+
(v) => v,
116+
(v$) =>
117+
v$.pipe(
118+
take(1),
119+
switchMap((v) => innerStreams[v]),
120+
),
121+
)
122+
123+
expectObservable(result).toBe(expectedStr, {
124+
e: [],
125+
f: ["a"],
126+
g: ["a", "b"],
127+
h: ["b"],
128+
i: [],
129+
})
130+
})
131+
})
132+
133+
it("errors when the source emits an error", () => {
134+
scheduler().run(({ expectObservable, cold }) => {
135+
const source = cold("-ab--#")
136+
const a = cold(" --1---2")
137+
const b = cold(" ------")
138+
const expectedStr = "efg--#"
139+
const innerStreams: Record<string, Observable<string>> = { a, b }
140+
const [, result] = partitionByKey(
141+
source,
142+
(v) => v,
143+
(v$) =>
144+
v$.pipe(
145+
take(1),
146+
switchMap((v) => innerStreams[v]),
147+
),
148+
)
149+
150+
expectObservable(result).toBe(expectedStr, {
151+
e: [],
152+
f: ["a"],
153+
g: ["a", "b"],
154+
})
155+
})
156+
})
157+
158+
it("removes a key when its inner stream emits an error", () => {
159+
scheduler().run(({ expectObservable, cold }) => {
160+
const source = cold("-ab-----")
161+
const a = cold(" --1-#")
162+
const b = cold(" ------")
163+
const expectedStr = "efg--h"
164+
const innerStreams: Record<string, Observable<string>> = { a, b }
165+
const [, result] = partitionByKey(
166+
source,
167+
(v) => v,
168+
(v$) =>
169+
v$.pipe(
170+
take(1),
171+
switchMap((v) => innerStreams[v]),
172+
),
173+
)
174+
175+
expectObservable(result).toBe(expectedStr, {
176+
e: [],
177+
f: ["a"],
178+
g: ["a", "b"],
179+
h: ["b"],
180+
})
181+
})
182+
})
88183
})
89184

90185
describe("getInstance$", () => {
@@ -98,7 +193,7 @@ describe("partitionByKey", () => {
98193
const expectB = " -----|"
99194
const expectC = " ------1-|"
100195

101-
const innerStreams: Record<string, Observable<any>> = { a, b, c }
196+
const innerStreams: Record<string, Observable<string>> = { a, b, c }
102197
const [getInstance$] = partitionByKey(
103198
source,
104199
(v) => v,

packages/utils/src/partitionByKey.ts

Lines changed: 92 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,13 @@
1-
import { GroupedObservable, Observable } from "rxjs"
1+
import { shareLatest } from "@react-rxjs/core"
2+
import {
3+
GroupedObservable,
4+
noop,
5+
Observable,
6+
Subject,
7+
Subscription,
8+
} from "rxjs"
29
import { map } from "rxjs/operators"
3-
import { collect, getGroupedObservable, split } from "./"
10+
import { getGroupedObservable } from "./"
411

512
/**
613
* Groups the elements from the source stream by using `keySelector`, returning
@@ -18,9 +25,89 @@ export function partitionByKey<T, K, R>(
1825
keySelector: (value: T) => K,
1926
streamSelector: (grouped: Observable<T>, key: K) => Observable<R>,
2027
): [(key: K) => GroupedObservable<K, R>, Observable<K[]>] {
21-
const source$ = stream.pipe(split(keySelector, streamSelector), collect())
28+
const groupedObservables$ = new Observable<Map<K, GroupedObservable<K, R>>>(
29+
(subscriber) => {
30+
const groups: Map<K, InnerGroup<T, K, R>> = new Map()
31+
32+
let warmup = true
33+
let sourceCompleted = false
34+
const sub = stream.subscribe(
35+
(x) => {
36+
const key = keySelector(x)
37+
if (groups.has(key)) {
38+
return groups.get(key)!.source.next(x)
39+
}
40+
41+
const subject = new Subject<T>()
42+
43+
const res = streamSelector(subject, key).pipe(
44+
shareLatest(),
45+
) as GroupedObservable<K, R>
46+
res.key = key
47+
48+
const innerGroup: InnerGroup<T, K, R> = {
49+
source: subject,
50+
observable: res,
51+
subscription: new Subscription(),
52+
}
53+
groups.set(key, innerGroup)
54+
55+
const onFinish = () => {
56+
groups.delete(key)
57+
subscriber.next(mapGroups(groups))
58+
59+
if (groups.size === 0 && sourceCompleted) {
60+
subscriber.complete()
61+
}
62+
}
63+
innerGroup.subscription = res.subscribe(noop, onFinish, onFinish)
64+
65+
subject.next(x)
66+
if (!warmup) {
67+
subscriber.next(mapGroups(groups))
68+
}
69+
},
70+
(e) => {
71+
subscriber.error(e)
72+
},
73+
() => {
74+
sourceCompleted = true
75+
if (groups.size === 0) {
76+
subscriber.complete()
77+
}
78+
groups.forEach((g) => g.source.complete())
79+
},
80+
)
81+
82+
warmup = false
83+
subscriber.next(mapGroups(groups))
84+
85+
return () => {
86+
sub.unsubscribe()
87+
groups.forEach((g) => {
88+
g.source.unsubscribe()
89+
g.subscription.unsubscribe()
90+
})
91+
}
92+
},
93+
).pipe(shareLatest())
94+
2295
return [
23-
(key: K) => getGroupedObservable(source$, key),
24-
source$.pipe(map((x) => Array.from(x.keys()))),
96+
(key: K) => getGroupedObservable(groupedObservables$, key),
97+
groupedObservables$.pipe(map((x) => Array.from(x.keys()))),
2598
]
2699
}
100+
101+
interface InnerGroup<T, K, R> {
102+
source: Subject<T>
103+
observable: GroupedObservable<K, R>
104+
subscription: Subscription
105+
}
106+
107+
function mapGroups<T, K, R>(
108+
groups: Map<K, InnerGroup<T, K, R>>,
109+
): Map<K, GroupedObservable<K, R>> {
110+
return new Map(
111+
Array.from(groups.entries()).map(([key, group]) => [key, group.observable]),
112+
)
113+
}

0 commit comments

Comments
 (0)