Skip to content

Commit 35156ab

Browse files
authored
Merge pull request #25 from tusharmath/skipRepeats
Skip repeats
2 parents c89ffbd + 680d4e3 commit 35156ab

File tree

3 files changed

+89
-0
lines changed

3 files changed

+89
-0
lines changed

src/operators/SkipRepeats.ts

Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/**
2+
* Created by niranjan on 12/10/16.
3+
*/
4+
5+
import {IObservable} from '../types/core/IObservable'
6+
import {IObserver} from '../types/core/IObserver'
7+
import {ISubscription} from '../types/core/ISubscription'
8+
import {IScheduler} from '../types/IScheduler'
9+
import {IHash} from '../types/IHash'
10+
11+
class SkipRepeatsObserver <T, H> implements IObserver<T> {
12+
private hash: H | void = undefined
13+
private init = true
14+
15+
constructor (private hasher: IHash<T, H>, private sink: IObserver<T>) {
16+
}
17+
18+
next (val: T) {
19+
const hash = this.hasher(val)
20+
if (this.init) {
21+
this.init = false
22+
this.sink.next(val)
23+
this.hash = hash
24+
}
25+
else if (this.hash !== hash) {
26+
this.sink.next(val)
27+
this.hash = hash
28+
}
29+
}
30+
31+
error (err: Error) {
32+
this.sink.error(err)
33+
}
34+
35+
complete (): void {
36+
this.sink.complete()
37+
}
38+
}
39+
40+
export class SkipRepeatsObservable <T, H> implements IObservable <T> {
41+
constructor (private hashFunction: IHash<T, H>, private source: IObservable<T>) {
42+
}
43+
44+
subscribe (observer: IObserver<T>, scheduler: IScheduler): ISubscription {
45+
return this.source.subscribe(new SkipRepeatsObserver(this.hashFunction, observer), scheduler)
46+
}
47+
}
48+
49+
export function skipRepeats<T, H> (hashFunction: IHash<T, H>, source: IObservable<T>): IObservable<T> {
50+
return new SkipRepeatsObservable(hashFunction, source)
51+
}

src/types/IHash.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
/**
2+
* Created by niranjan on 12/10/16.
3+
*/
4+
5+
export interface IHash <T, H> {
6+
(a: T): H
7+
}

test/test.SkipRepeatsObservable.ts

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
/**
2+
* Created by niranjan on 12/10/16.
3+
*/
4+
5+
'use strict'
6+
7+
import test from 'ava'
8+
import {skipRepeats} from '../src/operators/SkipRepeats'
9+
import {TestScheduler} from '../src/testing/TestScheduler'
10+
import {ReactiveEvents} from '../src/testing/ReactiveEvents'
11+
12+
const {next, complete} = ReactiveEvents
13+
14+
test('SkipRepeatsObservable.subscribe()', t => {
15+
const sh = TestScheduler.of()
16+
const $ = sh.Cold<number>([
17+
next(210, 0),
18+
next(215, 0),
19+
next(220, 10),
20+
next(230, 20),
21+
next(235, 20),
22+
complete(250)
23+
])
24+
const {results} = sh.start(() => skipRepeats<number, number>((x: number) => x, $))
25+
t.deepEqual(results, [
26+
next(410, 0),
27+
next(420, 10),
28+
next(430, 20),
29+
complete(450)
30+
])
31+
})

0 commit comments

Comments
 (0)