Skip to content

Commit 8a31c53

Browse files
authored
Merge pull request #154 from tusharmath/combine-new
Combine Operator
2 parents 1e9fef2 + 18eb29a commit 8a31c53

File tree

8 files changed

+152
-5
lines changed

8 files changed

+152
-5
lines changed

benchmarks/bm.combine.ts

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/**
2+
* Created by tushar on 08/12/16.
3+
*/
4+
import {Suite} from 'benchmark'
5+
import {combine} from '../src/operators/Combine'
6+
import {reduce} from '../src/operators/Reduce'
7+
import {fromArray} from '../src/sources/FromArray'
8+
import {array, IDeferred, run} from './lib'
9+
10+
const a = array(1e2)
11+
const b = array(1e2)
12+
const c = array(1e2)
13+
const sum3 = (a: number, b: number, c: number) => a + b + c
14+
const sum2 = (a: number, b: number) => a + b
15+
16+
export function bm_fromArray_combine (suite: Suite) {
17+
return suite
18+
.add(
19+
'file -> combine(sum3, [a, b, c]) -> reduce(sum2, 0)',
20+
(d: IDeferred) => run(
21+
reduce(sum2, 0, combine(sum3, [fromArray(a), fromArray(b), fromArray(c)])),
22+
d
23+
),
24+
{defer: true}
25+
)
26+
}

benchmarks/run.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
* Created by tushar.mathur on 01/10/16.
33
*/
44
import {Suite} from 'benchmark'
5+
import {bm_fromArray_combine} from './bm.combine'
56
import {bm_create} from './bm.create'
67
import {bm_debounce} from './bm.debounce'
78
import {bm_fromArray_map_reduce} from './bm.fromArray-map-reduce'
@@ -19,6 +20,7 @@ console.log('**V8:** ', process.versions.v8)
1920
const suite = new Suite()
2021
bm_create(suite)
2122
bm_debounce(suite)
23+
bm_fromArray_combine(suite)
2224
bm_fromArray_map_reduce(suite)
2325
bm_fromArray_scan_reduce(suite)
2426
bm_fromArray_takeN(suite)

extra.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,5 @@
22
* Created by tushar on 10/12/16.
33
*/
44

5-
export {curry} from './src/lib/Utils'
6-
export {EVENT} from './src/testing/Events'
7-
export {TestScheduler} from './src/testing/TestScheduler'
8-
export {marble, toMarble} from './src/testing/Marble'
5+
export {combine} from './src/operators/Combine'
6+
export {debounce} from './src/operators/Debounce'

package.json

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,5 +58,10 @@
5858
"ghooks": {
5959
"commit-msg": "validate-commit-msg"
6060
}
61+
},
62+
"ava": {
63+
"files": [
64+
"test/test.*.js"
65+
]
6166
}
6267
}

src/main.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,8 +2,8 @@
22
* Created by tushar on 17/02/17.
33
*/
44

5+
56
export {create} from './sources/Create'
6-
export {debounce} from './operators/Debounce'
77
export {delay} from './operators/Delay'
88
export {filter} from './operators/Filter'
99
export {forEach} from './lib/ForEach'

src/operators/Combine.ts

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/**
2+
* Created by tushar on 08/12/16.
3+
*/
4+
5+
6+
import {container} from '../lib/Container'
7+
import {IObservable} from '../lib/Observable'
8+
import {IObserver} from '../lib/Observer'
9+
import {IScheduler} from '../lib/Scheduler'
10+
import {CompositeSubscription, ISubscription} from '../lib/Subscription'
11+
import {curry} from '../lib/Utils'
12+
export type TSelector<T> = {(...e: Array<any>): T}
13+
export type TSource = Array<IObservable<any>>
14+
export type TResult <T> = IObservable<T>
15+
16+
class CombineValueObserver<T> implements IObserver<T> {
17+
constructor (private id: number, private sink: CombinedObserver<T>) {
18+
}
19+
20+
next (val: T): void {
21+
this.sink.onNext(val, this.id)
22+
}
23+
24+
error (err: Error): void {
25+
this.sink.onError(err)
26+
}
27+
28+
complete (): void {
29+
this.sink.onComplete(this.id)
30+
}
31+
}
32+
33+
class CombinedObserver<T> {
34+
private collection = container(this.total)
35+
36+
constructor (private func: TSelector<T>, private total: number, private sink: IObserver<T>) {
37+
}
38+
39+
onNext (value: T, id: number) {
40+
this.collection.next(value, id)
41+
if (this.collection.isOn()) {
42+
this.sink.next(this.func.apply(null, this.collection.values))
43+
}
44+
}
45+
46+
onComplete (id: number) {
47+
const hasCompleted = this.collection.complete(id)
48+
if (hasCompleted) {
49+
this.sink.complete()
50+
}
51+
}
52+
53+
onError (err: Error): void {
54+
this.sink.error(err)
55+
}
56+
}
57+
58+
class CombineObservable<T> implements IObservable<T> {
59+
constructor (private selector: TSelector<T>, private sources: Array<IObservable<any>>) {
60+
}
61+
62+
subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
63+
const cSub = new CompositeSubscription()
64+
const ob = new CombinedObserver(this.selector, this.sources.length, observer)
65+
for (var i = 0; i < this.sources.length; ++i) {
66+
cSub.add(this.sources[i].subscribe(new CombineValueObserver(i, ob), scheduler))
67+
}
68+
return cSub
69+
}
70+
}
71+
72+
export const combine = curry(<T> (selector: TSelector<T>, sources: IObservable<any>[]) =>
73+
new CombineObservable(selector, sources)
74+
) as Function &
75+
{<T, R> (selector: TSelector<T>, sources: TSource): TResult<R>} &
76+
{<T, R> (selector: TSelector<T>): {(sources: TSource): TResult<R>}}

test.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/**
2+
* Created by tushar on 13/04/17.
3+
*/
4+
5+
export {EVENT} from './src/testing/Events'
6+
export {TestScheduler} from './src/testing/TestScheduler'
7+
export {marble, toMarble} from './src/testing/Marble'

test/test.Combine.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
/**
2+
* Created by tushar on 26/02/17.
3+
*/
4+
import {test} from 'ava'
5+
import {combine} from '../src/operators/Combine'
6+
import {EVENT} from '../src/testing/Events'
7+
import {marble} from '../src/testing/Marble'
8+
import {TestScheduler} from '../src/testing/TestScheduler'
9+
10+
test(t => {
11+
const SH = TestScheduler.of()
12+
13+
const {results} = SH.start(() => {
14+
return combine(
15+
(a, b, c) => a + b + c,
16+
[
17+
SH.Hot(marble('a-b-c-d|')),
18+
SH.Hot(marble('-p-q-r-s|')),
19+
SH.Hot(marble('---x-y-z--|'))
20+
]
21+
)
22+
})
23+
t.deepEqual(results, [
24+
EVENT.next(230, 'bqx'),
25+
EVENT.next(240, 'cqx'),
26+
EVENT.next(250, 'crx'),
27+
EVENT.next(250, 'cry'),
28+
EVENT.next(260, 'dry'),
29+
EVENT.next(270, 'dsy'),
30+
EVENT.next(270, 'dsz'),
31+
EVENT.complete(300)
32+
])
33+
})

0 commit comments

Comments
 (0)