Skip to content

Commit a201f83

Browse files
committed
feat(join): add join observable
1 parent ffa0882 commit a201f83

File tree

2 files changed

+127
-0
lines changed

2 files changed

+127
-0
lines changed

src/operators/Join.ts

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
/**
2+
* Created by tushar.mathur on 10/10/16.
3+
*/
4+
5+
6+
import {IObservable} from '../types/core/IObservable';
7+
import {IObserver} from '../types/core/IObserver';
8+
import {IScheduler} from '../types/IScheduler';
9+
import {ISubscription} from '../types/core/ISubscription';
10+
import {CompositeSubscription} from '../testing/Subscription';
11+
12+
13+
export class JoinValueObserver<T> implements IObserver<T> {
14+
constructor (private sink: IObserver<T>, private root: JoinObserver<T>) {
15+
}
16+
17+
next (val: T): void {
18+
this.sink.next(val)
19+
}
20+
21+
error (err: Error): void {
22+
this.sink.error(err)
23+
}
24+
25+
complete (): void {
26+
this.root.subscriptionCompleted()
27+
}
28+
29+
}
30+
31+
export class JoinObserver<T> implements IObserver<IObservable<T>> {
32+
private count: number;
33+
private sourceCompleted: boolean;
34+
35+
constructor (private sink: IObserver<T>, private scheduler: IScheduler, private subscriptions: CompositeSubscription) {
36+
this.sourceCompleted = false
37+
this.count = 0
38+
}
39+
40+
subscriptionCompleted () {
41+
this.count--
42+
this.completeSink()
43+
}
44+
45+
completeSink () {
46+
if (this.sourceCompleted && this.count === 0) {
47+
this.sink.complete()
48+
}
49+
}
50+
51+
52+
next (val: IObservable<T>): void {
53+
const joinValueObserver = new JoinValueObserver(this.sink, this);
54+
this.count++
55+
this.subscriptions.add(
56+
val.subscribe(joinValueObserver, this.scheduler)
57+
)
58+
}
59+
60+
error (err: Error): void {
61+
this.sink.error(err)
62+
}
63+
64+
complete (): void {
65+
this.sourceCompleted = true
66+
this.completeSink()
67+
}
68+
}
69+
70+
71+
export class JoinObservable<T> implements IObservable<T> {
72+
constructor (private source: IObservable<IObservable<T>>) {
73+
}
74+
75+
subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
76+
const subscription = new CompositeSubscription()
77+
subscription.add(
78+
this.source.subscribe(new JoinObserver(observer, scheduler, subscription), scheduler)
79+
)
80+
return subscription
81+
}
82+
}
83+
84+
export function join <T> (source: IObservable<IObservable<T>>) {
85+
return new JoinObservable(source)
86+
}

test/test.JoinObservable.ts

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/**
2+
* Created by tushar.mathur on 10/10/16.
3+
*/
4+
5+
import test from 'ava';
6+
import {join} from '../src/operators/Join';
7+
import {TestScheduler} from '../src/testing/TestScheduler';
8+
import {ReactiveTest} from '../src/testing/ReactiveTest';
9+
const {next, complete} = ReactiveTest
10+
11+
test('subscribe()', t => {
12+
const sh = TestScheduler.of()
13+
const sa$$ = sh.createColdObservable([
14+
next(10, 'A0'),
15+
next(20, 'A1'),
16+
next(30, 'A2'),
17+
complete(40)
18+
])
19+
const sb$$ = sh.createColdObservable([
20+
next(10, 'B0'),
21+
next(20, 'B1'),
22+
next(30, 'B2'),
23+
complete(40)
24+
])
25+
const s$$ = sh.createColdObservable([
26+
next(10, sa$$),
27+
next(20, sb$$),
28+
complete(100)
29+
])
30+
const {results} = sh.startScheduler<number>(() => join(s$$))
31+
32+
t.deepEqual(results, [
33+
next(220, 'A0'),
34+
next(230, 'A1'),
35+
next(230, 'B0'),
36+
next(240, 'A2'),
37+
next(240, 'B1'),
38+
next(250, 'B2'),
39+
complete(300)
40+
])
41+
})

0 commit comments

Comments
 (0)