Skip to content

Commit 38fd3c7

Browse files
committed
fix(core): factory bind multiple (un/re)subscriptions
1 parent 6cc1b36 commit 38fd3c7

File tree

2 files changed

+86
-3
lines changed

2 files changed

+86
-3
lines changed

packages/core/src/bind/connectFactoryObservable.test.tsx

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {
99
Subject,
1010
} from "rxjs"
1111
import { renderHook, act as actHook } from "@testing-library/react-hooks"
12-
import { switchMap, delay } from "rxjs/operators"
12+
import { switchMap, delay, take } from "rxjs/operators"
1313
import { FC, Suspense, useState } from "react"
1414
import React from "react"
1515
import {
@@ -444,5 +444,80 @@ describe("connectFactoryObservable", () => {
444444
expect(sub4.closed).toBe(false)
445445
sub4.unsubscribe()
446446
})
447+
448+
describe("re-subscriptions on disposed observables", () => {
449+
it("registers itself when no other observable has been registered for that key", () => {
450+
const key = 0
451+
let sideEffects = 0
452+
453+
const [, getShared] = bind((_: number) =>
454+
defer(() => {
455+
return of(++sideEffects)
456+
}),
457+
)
458+
459+
const stream = getShared(key)
460+
461+
let val
462+
stream.pipe(take(1)).subscribe((x) => {
463+
val = x
464+
})
465+
expect(val).toBe(1)
466+
467+
stream.pipe(take(1)).subscribe((x) => {
468+
val = x
469+
})
470+
expect(val).toBe(2)
471+
472+
const subscription = stream.subscribe((x) => {
473+
val = x
474+
})
475+
expect(val).toBe(3)
476+
477+
getShared(key)
478+
.pipe(take(1))
479+
.subscribe((x) => {
480+
val = x
481+
})
482+
expect(val).toBe(3)
483+
subscription.unsubscribe()
484+
})
485+
486+
it("subscribes to the currently registered observable if a new observalbe has been registered for that key", () => {
487+
const key = 0
488+
let sideEffects = 0
489+
490+
const [, getShared] = bind((_: number) =>
491+
defer(() => {
492+
return of(++sideEffects)
493+
}),
494+
)
495+
496+
const stream = getShared(key)
497+
498+
let val
499+
stream.pipe(take(1)).subscribe((x) => {
500+
val = x
501+
})
502+
expect(val).toBe(1)
503+
504+
const subscription = getShared(key).subscribe((x) => {
505+
val = x
506+
})
507+
expect(val).toBe(2)
508+
509+
stream.pipe(take(1)).subscribe((x) => {
510+
val = x
511+
})
512+
expect(val).toBe(2)
513+
514+
stream.pipe(take(1)).subscribe((x) => {
515+
val = x
516+
})
517+
expect(val).toBe(2)
518+
519+
subscription.unsubscribe()
520+
})
521+
})
447522
})
448523
})

packages/core/src/bind/connectFactoryObservable.ts

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Observable } from "rxjs"
1+
import { Observable, defer } from "rxjs"
22
import shareLatest from "../internal/share-latest"
33
import reactEnhancer from "../internal/react-enhancer"
44
import { BehaviorObservable } from "../internal/BehaviorObservable"
@@ -55,8 +55,16 @@ export default function connectFactoryObservable<A extends [], O>(
5555

5656
const reactObservable$ = reactEnhancer(sharedObservable$)
5757

58+
const publicShared$: Observable<O> = defer(() => {
59+
const inCache = cache.get(keys)
60+
if (inCache) {
61+
return inCache[0] === publicShared$ ? sharedObservable$ : inCache[0]
62+
}
63+
return getSharedObservables$(input)[0]
64+
})
65+
5866
const result: [Observable<O>, BehaviorObservable<O>] = [
59-
sharedObservable$,
67+
publicShared$,
6068
reactObservable$,
6169
]
6270

0 commit comments

Comments
 (0)