Skip to content

Commit fb7004a

Browse files
volivajosepot
andauthored
Improve performance of partitionByKey with a big number of elements (#232)
* Improve performance of partitionByKey with a big number of elements * Improve performance of partitionByKey by not serialising the keys * partition by key to emit deltas instead of the resulting keys * update tests for partitionByKey * update tests for combineKeys * getGroupedObservable: close outer subscription once the inner stream is found * add performance tests to partitionByKey * make tests more predictable * adjust performance threshold Co-authored-by: Josep M Sobrepere <jm.sobrepere@gmail.com>
1 parent 841a865 commit fb7004a

File tree

6 files changed

+573
-173
lines changed

6 files changed

+573
-173
lines changed

packages/utils/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
"build:cjs:dev": "esbuild src/index.tsx --bundle --outfile=./dist/utils.cjs.development.js --target=es2015 --external:react --external:rxjs --external:@react-rxjs/core --format=cjs --sourcemap",
3131
"build:cjs:prod": "esbuild src/index.tsx --bundle --outfile=./dist/utils.cjs.production.min.js --target=es2015 --external:react --external:rxjs --external:@react-rxjs/core --format=cjs --minify --sourcemap",
3232
"build:ts": "tsc -p ./tsconfig-build.json --outDir ./dist --skipLibCheck --emitDeclarationOnly",
33-
"test": "jest --coverage",
33+
"test": "node --expose-gc ../../node_modules/.bin/jest --coverage --runInBand",
3434
"lint": "prettier --check README.md \"src/**/*.{js,jsx,ts,tsx,json,md}\"",
3535
"format": "prettier --write README.md \"src/**/*.{js,jsx,ts,tsx,json,md}\"",
3636
"prepare": "yarn build"

packages/utils/src/combineKeys.test.ts

Lines changed: 129 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import { concat, NEVER, Observable, of } from "rxjs"
22
import { map, scan } from "rxjs/operators"
33
import { TestScheduler } from "rxjs/testing"
4-
import { combineKeys } from "./"
4+
import { combineKeys, KeyChanges } from "./"
55

66
const scheduler = () =>
77
new TestScheduler((actual, expected) => {
@@ -288,4 +288,132 @@ describe("combineKeys", () => {
288288
})
289289
})
290290
})
291+
292+
describe("with deltas", () => {
293+
it("emits a map with the latest value of the stream of each key added", () => {
294+
scheduler().run(({ expectObservable, cold }) => {
295+
const keys = cold<KeyChanges<string>>(" ab---cd---", {
296+
a: {
297+
type: "add",
298+
keys: ["a"],
299+
},
300+
b: {
301+
type: "add",
302+
keys: ["b"],
303+
},
304+
c: {
305+
type: "add",
306+
keys: ["c"],
307+
},
308+
d: {
309+
type: "add",
310+
keys: ["d"],
311+
},
312+
})
313+
const a = cold(" --1---2---")
314+
const b = cold(" ---------")
315+
const c = cold(" 1----")
316+
const d = cold(" 9---")
317+
const expectedStr = "--e--f(gh)"
318+
319+
const innerStreams: Record<string, Observable<string>> = { a, b, c, d }
320+
321+
const result = combineKeys(
322+
keys,
323+
(v): Observable<string> => innerStreams[v],
324+
).pipe(map((x) => Object.fromEntries(x.entries())))
325+
326+
expectObservable(result).toBe(expectedStr, {
327+
e: { a: "1" },
328+
f: { a: "1", c: "1" },
329+
g: { a: "2", c: "1" },
330+
h: { a: "2", c: "1", d: "9" },
331+
})
332+
})
333+
})
334+
335+
it("removes the entry of a key when that key is removed", () => {
336+
scheduler().run(({ expectObservable, cold }) => {
337+
const expectedStr = " e-f-g-h-"
338+
const keys = cold<KeyChanges<string>>("a-b-c-d-", {
339+
a: {
340+
type: "add",
341+
keys: ["a"],
342+
},
343+
b: {
344+
type: "add",
345+
keys: ["b"],
346+
},
347+
c: {
348+
type: "remove",
349+
keys: ["a"],
350+
},
351+
d: {
352+
type: "remove",
353+
keys: ["b"],
354+
},
355+
})
356+
357+
const result = combineKeys(keys, (v): Observable<string> => of(v)).pipe(
358+
map((x) => Object.fromEntries(x.entries())),
359+
)
360+
361+
expectObservable(result).toBe(expectedStr, {
362+
e: { a: "a" },
363+
f: { a: "a", b: "b" },
364+
g: { b: "b" },
365+
h: {},
366+
})
367+
})
368+
})
369+
370+
it("accepts more than one key change at a time", () => {
371+
scheduler().run(({ expectObservable, cold }) => {
372+
const expectedStr = " e-f-"
373+
const keys = cold<KeyChanges<string>>("a-b-", {
374+
a: {
375+
type: "add",
376+
keys: ["a", "b", "c"],
377+
},
378+
b: {
379+
type: "remove",
380+
keys: ["c", "b"],
381+
},
382+
})
383+
384+
const result = combineKeys(keys, (v): Observable<string> => of(v)).pipe(
385+
map((x) => Object.fromEntries(x.entries())),
386+
)
387+
388+
expectObservable(result).toBe(expectedStr, {
389+
e: { a: "a", b: "b", c: "c" },
390+
f: { a: "a" },
391+
})
392+
})
393+
})
394+
395+
it("omits removing keys that don't exist", () => {
396+
scheduler().run(({ expectObservable, cold }) => {
397+
const expectedStr = " e---"
398+
const keys = cold<KeyChanges<string>>("a-b-", {
399+
a: {
400+
type: "add",
401+
keys: ["a"],
402+
},
403+
b: {
404+
type: "remove",
405+
keys: ["b"],
406+
},
407+
})
408+
409+
const result = combineKeys(keys, (v): Observable<string> => of(v)).pipe(
410+
map((x) => Object.fromEntries(x.entries())),
411+
)
412+
413+
expectObservable(result).toBe(expectedStr, {
414+
e: { a: "a" },
415+
})
416+
})
417+
})
418+
})
291419
})

packages/utils/src/combineKeys.ts

Lines changed: 46 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import { KeyChanges } from "./partitionByKey"
12
import { Observable, Subscription } from "rxjs"
23

34
export interface MapWithChanges<K, V> extends Map<K, V> {
@@ -12,7 +13,7 @@ export interface MapWithChanges<K, V> extends Map<K, V> {
1213
* @returns An stream with a map containing the latest value from the stream of each key.
1314
*/
1415
export const combineKeys = <K, T>(
15-
keys$: Observable<Array<K> | Set<K>>,
16+
keys$: Observable<Iterable<K> | KeyChanges<K>>,
1617
getInner$: (key: K) => Observable<T>,
1718
): Observable<MapWithChanges<K, T>> =>
1819
new Observable((observer) => {
@@ -24,7 +25,7 @@ export const combineKeys = <K, T>(
2425

2526
const next = () => {
2627
if (!updatingSource) {
27-
const result = Object.assign(new Map(currentValue), {
28+
const result = Object.assign(currentValue, {
2829
changes,
2930
})
3031
changes = new Set<K>()
@@ -36,20 +37,42 @@ export const combineKeys = <K, T>(
3637
const subscription = keys$.subscribe(
3738
(nextKeysArr) => {
3839
updatingSource = true
39-
const nextKeys = new Set(nextKeysArr)
40-
innerSubscriptions.forEach((sub, key) => {
41-
if (!nextKeys.has(key)) {
42-
sub.unsubscribe()
43-
innerSubscriptions.delete(key)
44-
if (currentValue.has(key)) {
45-
changes.add(key)
46-
currentValue.delete(key)
47-
}
48-
} else {
49-
nextKeys.delete(key)
40+
41+
const keys = new Set(
42+
inputIsKeyChanges(nextKeysArr) ? nextKeysArr.keys : nextKeysArr,
43+
)
44+
45+
if (inputIsKeyChanges(nextKeysArr)) {
46+
if (nextKeysArr.type === "remove") {
47+
keys.forEach((key) => {
48+
const sub = innerSubscriptions.get(key)
49+
if (!sub) return
50+
sub.unsubscribe()
51+
innerSubscriptions.delete(key)
52+
if (currentValue.has(key)) {
53+
changes.add(key)
54+
currentValue.delete(key)
55+
}
56+
})
57+
// Keys after this block is the list of keys to add. Clear it.
58+
keys.clear()
5059
}
51-
})
52-
nextKeys.forEach((key) => {
60+
} else {
61+
innerSubscriptions.forEach((sub, key) => {
62+
if (!keys.has(key)) {
63+
sub.unsubscribe()
64+
innerSubscriptions.delete(key)
65+
if (currentValue.has(key)) {
66+
changes.add(key)
67+
currentValue.delete(key)
68+
}
69+
} else {
70+
keys.delete(key)
71+
}
72+
})
73+
}
74+
75+
keys.forEach((key) => {
5376
innerSubscriptions.set(
5477
key,
5578
getInner$(key).subscribe(
@@ -66,12 +89,13 @@ export const combineKeys = <K, T>(
6689
),
6790
)
6891
})
92+
6993
updatingSource = false
7094
// If there are no changes but the nextKeys are an empty iterator
7195
// and we have never emitted before, that means that the first
7296
// value that keys$ has emitted is an empty iterator, therefore
7397
// we should emit an empy Map
74-
if (changes.size || (isPristine && !nextKeys.size)) next()
98+
if (changes.size || (isPristine && !keys.size)) next()
7599
},
76100
(e) => {
77101
observer.error(e)
@@ -88,3 +112,9 @@ export const combineKeys = <K, T>(
88112
})
89113
}
90114
})
115+
116+
function inputIsKeyChanges<K>(
117+
input: Iterable<K> | KeyChanges<K>,
118+
): input is KeyChanges<K> {
119+
return "type" in input && "keys" in input
120+
}

packages/utils/src/index.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ export { combineKeys, MapWithChanges } from "./combineKeys"
22
export { createSignal } from "./createSignal"
33
export { createKeyedSignal } from "./createKeyedSignal"
44
export { mergeWithKey } from "./mergeWithKey"
5-
export { partitionByKey } from "./partitionByKey"
5+
export { partitionByKey, KeyChanges } from "./partitionByKey"
66
export { suspend } from "./suspend"
77
export { suspended } from "./suspended"
88
export { switchMapSuspended } from "./switchMapSuspended"

0 commit comments

Comments
 (0)