Skip to content

Commit 0388f78

Browse files
authored
Merge pull request #33 from tusharmath/switch
feat(switch): add switch operator
2 parents 9a9d330 + 3bf2e3e commit 0388f78

File tree

3 files changed

+115
-2
lines changed

3 files changed

+115
-2
lines changed

src/main.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44

55
// Observer
66
export {Observer} from './lib/Observer'
7-
87
// Operators
98
import {filter as _filter} from './operators/Filter'
109
import {join as _join} from './operators/Join'
@@ -13,7 +12,7 @@ import {scan as _scan} from './operators/Scan'
1312
import {slice as _slice} from './operators/Slice'
1413
import {tap as _tap} from './operators/Tap'
1514
import {reduce as _reduce} from './operators/Reduce'
16-
15+
import {switchLatest as _switchLatest} from './operators/Switch'
1716
// Sources
1817
import {fromArray as _fromArray} from './sources/FromArray'
1918
import {interval as _interval} from './sources/Interval'
@@ -31,3 +30,4 @@ export const reduce = Curry(_reduce)
3130
export const fromArray = Curry(_fromArray)
3231
export const interval = Curry(_interval)
3332
export const fromDOM = Curry(_fromDOM)
33+
export const switchLatest = Curry(_switchLatest)

src/operators/Switch.ts

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
/**
2+
* Created by tushar.mathur on 16/10/16.
3+
*/
4+
import {IObservable} from '../types/core/IObservable'
5+
import {ISubscription} from '../types/core/ISubscription'
6+
import {IScheduler} from '../types/IScheduler'
7+
import {IObserver} from '../types/core/IObserver'
8+
import {CompositeSubscription} from '../lib/CompositeSubscription'
9+
import {Node} from '../lib/LinkedList'
10+
11+
12+
export class SwitchValueObserver<T> implements IObserver<T> {
13+
constructor (private sink: IObserver<T>) {
14+
}
15+
16+
next (val: T): void {
17+
this.sink.next(val)
18+
}
19+
20+
error (err: Error): void {
21+
this.sink.error(err)
22+
}
23+
24+
complete (): void {
25+
}
26+
}
27+
28+
export class SwitchObserver<T> implements IObserver<IObservable<T>> {
29+
private currentSub: Node<ISubscription> | undefined = void 0
30+
31+
constructor (private sink: IObserver<T>,
32+
private cSub: CompositeSubscription,
33+
private scheduler: IScheduler) {
34+
}
35+
36+
private removeCurrentSub () {
37+
if (this.currentSub) this.cSub.remove(this.currentSub)
38+
}
39+
40+
private setCurrentSub (subscription: ISubscription) {
41+
this.removeCurrentSub()
42+
this.currentSub = this.cSub.add(subscription)
43+
}
44+
45+
next (val: IObservable<T>): void {
46+
this.setCurrentSub(val.subscribe(new SwitchValueObserver(this.sink), this.scheduler))
47+
}
48+
49+
error (err: Error): void {
50+
this.sink.error(err)
51+
}
52+
53+
complete (): void {
54+
this.removeCurrentSub()
55+
this.sink.complete()
56+
}
57+
}
58+
59+
export class SwitchLatest<T> implements IObservable<T> {
60+
constructor (private source: IObservable<IObservable<T>>) {
61+
}
62+
63+
subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
64+
const cSub = new CompositeSubscription()
65+
cSub.add(this.source.subscribe(new SwitchObserver(observer, cSub, scheduler), scheduler))
66+
return cSub
67+
}
68+
}
69+
70+
export function switchLatest<T> (source: IObservable<IObservable<T>>) {
71+
return new SwitchLatest(source)
72+
}

test/test.Switch.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Created by tushar.mathur on 16/10/16.
3+
*/
4+
import test from 'ava'
5+
import {TestScheduler} from '../src/testing/TestScheduler'
6+
import {ReactiveEvents} from '../src/testing/ReactiveEvents'
7+
import {switchLatest} from '../src/operators/Switch'
8+
9+
10+
test(t => {
11+
const sh = TestScheduler.of()
12+
const a$$ = sh.Hot([
13+
ReactiveEvents.next(210, 'A0'),
14+
ReactiveEvents.next(220, 'A1'),
15+
ReactiveEvents.next(230, 'A2'),
16+
ReactiveEvents.next(240, 'A3'),
17+
ReactiveEvents.complete(250)
18+
])
19+
20+
const b$$ = sh.Hot([
21+
ReactiveEvents.next(230, 'B0'),
22+
ReactiveEvents.next(240, 'B1'),
23+
ReactiveEvents.complete(250)
24+
])
25+
26+
const source$ = sh.Hot([
27+
ReactiveEvents.next(205, a$$),
28+
ReactiveEvents.next(225, b$$),
29+
ReactiveEvents.complete(300)
30+
])
31+
const {results} = sh.start(() => switchLatest(source$))
32+
t.deepEqual(results, [
33+
ReactiveEvents.next(210, 'A0'),
34+
ReactiveEvents.next(220, 'A1'),
35+
ReactiveEvents.next(230, 'B0'),
36+
ReactiveEvents.next(240, 'B1'),
37+
ReactiveEvents.complete(300)
38+
])
39+
t.deepEqual(a$$.subscriptions.map(r => r.time), [205, 225])
40+
t.deepEqual(b$$.subscriptions.map(r => r.time), [225, 300])
41+
})

0 commit comments

Comments
 (0)