Skip to content

Commit ec8f8ad

Browse files
josepotvoliva
andauthored
partitionByKey fixes and improvements (#251)
* `partitionByKey` fixes and improvements * fix `pendingFirstAdd` * add order to sync test * Update packages/utils/src/partitionByKey.ts Co-authored-by: Victor Oliva <olivarra1@gmail.com> Co-authored-by: Victor Oliva <olivarra1@gmail.com>
1 parent 78416fb commit ec8f8ad

File tree

2 files changed

+85
-33
lines changed

2 files changed

+85
-33
lines changed

packages/utils/src/partitionByKey.test.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,47 @@ describe("partitionByKey", () => {
610610
expectObservable(getInstance$("b")).toBe(expectB)
611611
})
612612
})
613+
614+
it("synchronously emits when the group observable notifies of a new GroupedObservable", () => {
615+
const subject = new Subject<number>()
616+
const [getInner$, keys$] = partitionByKey(subject, (x) => x, take(1))
617+
618+
const key = 8
619+
let receivedValue = 0
620+
let deleted: number[] = []
621+
let done = false
622+
let order: string[] = []
623+
keys$.subscribe((keys) => {
624+
if (keys.type === "add") {
625+
order.push("outer add")
626+
getInner$([...keys.keys][0]).subscribe({
627+
next: (x) => {
628+
receivedValue = x
629+
order.push("inner next")
630+
},
631+
complete: () => {
632+
order.push("inner complete")
633+
done = true
634+
},
635+
})
636+
} else {
637+
order.push("outer delete")
638+
deleted = [...keys.keys]
639+
}
640+
})
641+
642+
subject.next(key)
643+
644+
expect(receivedValue).toBe(key)
645+
expect(done).toBe(true)
646+
expect(deleted).toEqual([key])
647+
expect(order).toEqual([
648+
"outer add",
649+
"inner next",
650+
"outer delete",
651+
"inner complete",
652+
])
653+
})
613654
})
614655

615656
describe("performance", () => {

packages/utils/src/partitionByKey.ts

Lines changed: 44 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import { shareLatest } from "@react-rxjs/core"
22
import {
3-
defer,
43
GroupedObservable,
54
identity,
65
noop,
76
Observable,
87
Subject,
98
Subscription,
109
} from "rxjs"
11-
import { finalize, map } from "rxjs/operators"
10+
import { map } from "rxjs/operators"
1211

1312
export interface KeyChanges<K> {
1413
type: "add" | "remove"
@@ -59,23 +58,55 @@ export function partitionByKey<T, K, R>(
5958
const groups: Map<K, InnerGroup<T, K, R>> = new Map()
6059

6160
let sourceCompleted = false
61+
const finalize =
62+
(type: "error" | "complete") =>
63+
(...args: any[]) => {
64+
sourceCompleted = true
65+
if (groups.size) {
66+
groups.forEach((g) => (g.source[type] as any)(...args))
67+
} else {
68+
subscriber[type](...args)
69+
}
70+
}
71+
6272
const sub = stream.subscribe(
6373
(x) => {
6474
const key = keySelector(x)
65-
if (groups.has(key)) {
66-
return groups.get(key)!.source.next(x)
75+
if (groups.has(key)) return groups.get(key)!.source.next(x)
76+
77+
let pendingFirstAdd = true
78+
const emitFirstAdd = () => {
79+
if (pendingFirstAdd) {
80+
pendingFirstAdd = false
81+
subscriber.next({
82+
groups,
83+
changes: {
84+
type: "add",
85+
keys: [key],
86+
},
87+
})
88+
}
6789
}
6890

6991
const subject = new Subject<T>()
92+
let pendingFirstVal = true
93+
const emitFirstValue = () => {
94+
if (pendingFirstVal) {
95+
pendingFirstVal = false
96+
subject.next(x)
97+
}
98+
}
7099

71100
const shared$ = shareLatest()(
72101
(streamSelector || identity)(subject, key),
73102
)
74-
75-
const res = defer(() => {
103+
const res = new Observable((observer) => {
76104
incRefcount()
77-
return shared$
78-
}).pipe(finalize(() => decRefcount())) as any as GroupedObservable<K, R>
105+
const subscription = shared$.subscribe(observer)
106+
subscription.add(decRefcount)
107+
emitFirstValue()
108+
return subscription
109+
}) as any as GroupedObservable<K, R>
79110
;(res as any).key = key
80111

81112
const innerGroup: InnerGroup<T, K, R> = {
@@ -85,19 +116,12 @@ export function partitionByKey<T, K, R>(
85116
}
86117
groups.set(key, innerGroup)
87118

88-
subscriber.next({
89-
groups,
90-
changes: {
91-
type: "add",
92-
keys: [key],
93-
},
94-
})
95-
96119
innerGroup.subscription = shared$.subscribe(
97120
noop,
98121
(e) => subscriber.error(e),
99122
() => {
100123
groups.delete(key)
124+
emitFirstAdd()
101125
subscriber.next({
102126
groups,
103127
changes: {
@@ -111,24 +135,11 @@ export function partitionByKey<T, K, R>(
111135
}
112136
},
113137
)
114-
subject.next(x)
115-
},
116-
(e) => {
117-
sourceCompleted = true
118-
if (groups.size) {
119-
groups.forEach((g) => g.source.error(e))
120-
} else {
121-
subscriber.error(e)
122-
}
123-
},
124-
() => {
125-
sourceCompleted = true
126-
if (groups.size) {
127-
groups.forEach((g) => g.source.complete())
128-
} else {
129-
subscriber.complete()
130-
}
138+
emitFirstAdd()
139+
emitFirstValue()
131140
},
141+
finalize("error"),
142+
finalize("complete"),
132143
)
133144

134145
return () => {

0 commit comments

Comments
 (0)