Skip to content

Commit 54a461a

Browse files
authored
Merge pull request #12 from tusharmath/test-scheduler
Test scheduler
2 parents 24d65e6 + 1e772b7 commit 54a461a

11 files changed

+159
-42
lines changed

benchmark.txt

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,3 +24,7 @@ Sun Oct 9 19:01:52 IST 2016
2424
file -> map -> reduce x 545 ops/sec ±1.09% (80 runs sampled)
2525
file -> takeN(0, n/10) x 255 ops/sec ±0.65% (78 runs sampled)
2626
file -> scan -> reduce x 82.85 ops/sec ±2.59% (74 runs sampled)
27+
Tue Oct 11 11:37:52 IST 2016
28+
file -> map -> reduce x 522 ops/sec ±1.24% (78 runs sampled)
29+
file -> takeN(0, n/10) x 254 ops/sec ±0.86% (80 runs sampled)
30+
file -> scan -> reduce x 83.45 ops/sec ±1.08% (75 runs sampled)

src/testing/ColdTestObservable.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/**
2+
* Created by tushar.mathur on 10/10/16.
3+
*/
4+
5+
import {IEvent, EventType} from '../types/IEvent';
6+
import {TestObservable} from './TestObservable';
7+
import {ISubscriptionObserver} from '../types/core/ISubscriptionObserver';
8+
import {EventNext} from './ReactiveTest';
9+
import {IScheduler} from '../types/IScheduler';
10+
11+
export function ColdTestObservable<T> (scheduler: IScheduler, events: Array<IEvent>) {
12+
return new TestObservable((observer: ISubscriptionObserver<any>) => {
13+
let closed = false
14+
for (var i = 0; i < events.length && !closed; i++) {
15+
const event = events[i]
16+
if (event.type === EventType.next) {
17+
scheduler.schedule(() => observer.next((<EventNext<any>> event).value), event.time)
18+
}
19+
else if (event.type === EventType.complete) {
20+
scheduler.schedule(() => observer.complete(), event.time)
21+
}
22+
}
23+
return {
24+
unsubscribe () {
25+
closed = true
26+
},
27+
closed
28+
}
29+
})
30+
}

src/testing/HotTestObservable.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import {EventType, IEvent} from '../types/IEvent';
2+
import {TestObservable} from './TestObservable';
3+
import {ISubscriptionObserver} from '../types/core/ISubscriptionObserver';
4+
import {EventNext, EventError} from './ReactiveTest';
5+
import {IObserver} from '../types/core/IObserver';
6+
import {TestScheduler} from './TestScheduler';
7+
/**
8+
* Created by tushar.mathur on 10/10/16.
9+
*/
10+
11+
12+
export function dispatchEvents<T> (event: IEvent, observers: Array<IObserver<T>>, closed: Array<boolean>) {
13+
observers
14+
.filter((x, i) => !closed[i])
15+
.forEach(ob => {
16+
if (event.type === EventType.next) return ob.next((event as EventNext<T>).value)
17+
if (event.type === EventType.complete) return ob.complete()
18+
if (event.type === EventType.error) return ob.error((event as EventError).value)
19+
})
20+
}
21+
22+
export function HotTestObservable<T> (scheduler: TestScheduler, events: Array<IEvent>) {
23+
const observers: Array<IObserver<T>> = []
24+
const closed: Array<boolean> = []
25+
events.forEach(ev => {
26+
scheduler.scheduleAbsolute(
27+
() => dispatchEvents(ev, observers, closed),
28+
ev.time)
29+
}
30+
)
31+
return new TestObservable((ob: ISubscriptionObserver<T>) => {
32+
const i = observers.push(ob) - 1
33+
closed[i] = false
34+
return {
35+
unsubscribe () {
36+
closed[i] = true
37+
},
38+
get closed () {
39+
return closed[i]
40+
}
41+
}
42+
})
43+
}

src/testing/TestScheduler.ts

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,11 @@ import {ITask} from '../types/ITask';
66
import {IScheduler} from '../types/IScheduler';
77
import {IObservable} from '../types/core/IObservable';
88
import {ISubscription} from '../types/core/ISubscription';
9-
import {EventNext} from './ReactiveTest';
10-
import {IEvent, EventType} from '../types/IEvent';
9+
import {IEvent} from '../types/IEvent';
1110
import {TestObserver} from './TestObserver';
12-
import {TestObservable} from './TestObservable';
13-
import {ISubscriptionObserver} from '../types/core/ISubscriptionObserver';
11+
import {ColdTestObservable} from './ColdTestObservable';
1412
import {ISchedulingStrategy} from '../types/ISchedulingStrategy';
13+
import {HotTestObservable} from './HotTestObservable';
1514

1615
class TaskSchedule {
1716
constructor (public task: ITask, public time: number) {
@@ -90,8 +89,8 @@ export class TestScheduler implements IScheduler {
9089
this.queue = residual
9190
}
9291

93-
startScheduler<T> (f: () => IObservable<T>,
94-
timing: {start: number, stop: number} = DEFAULT_TIMING): TestObserver<T> {
92+
start<T> (f: () => IObservable<T>,
93+
timing: {start: number, stop: number} = DEFAULT_TIMING): TestObserver<T> {
9594
var subscription: ISubscription
9695
var resultsObserver = new TestObserver(this);
9796
this.scheduleAbsolute(() => subscription = f().subscribe(resultsObserver, this), timing.start)
@@ -102,25 +101,12 @@ export class TestScheduler implements IScheduler {
102101
return resultsObserver
103102
}
104103

105-
createColdObservable <T> (events: Array<IEvent>): IObservable<T> {
106-
return new TestObservable((observer: ISubscriptionObserver<any>) => {
107-
let closed = false
108-
for (var i = 0; i < events.length && !closed; i++) {
109-
const event = events[i]
110-
if (event.type === EventType.next) {
111-
this.schedule(() => observer.next((<EventNext<any>> event).value), event.time)
112-
}
113-
else if (event.type === EventType.complete) {
114-
this.schedule(() => observer.complete(), event.time)
115-
}
116-
}
117-
return {
118-
unsubscribe () {
119-
closed = true
120-
},
121-
closed
122-
}
123-
})
104+
Cold <T> (events: Array<IEvent>) {
105+
return ColdTestObservable(this, events)
106+
}
107+
108+
Hot <T> (events: Array<IEvent>) {
109+
return HotTestObservable(this, events)
124110
}
125111

126112
static of () {

src/types/IScheduler.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ import {ITask} from './ITask';
77
import {ISchedulingStrategy} from './ISchedulingStrategy';
88

99
export interface IScheduler {
10-
schedule(task: ITask, time: number): ISubscription
10+
schedule(task: ITask, relativeTime: number): ISubscription
1111
scheduleASAP(task: ITask): ISubscription
1212
scheduleNow(task: ITask): ISubscription
1313
scheduleRepeatedly(task: ITask, interval: number): ISubscription

test/test.FromArray.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ const {next, error} = ReactiveTest
1414
test(t => {
1515
const sh = TestScheduler.of()
1616
const testFunction = (x: any) => x === 2 ? x.do() : x * 100
17-
const {results} = sh.startScheduler(() => map(testFunction, fromArray([1, 2, 3])))
17+
const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3])))
1818
t.deepEqual(results, [
1919
next(201, 100),
2020
error(201, new TypeError())

test/test.IntervalObservable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ const {next, error} = ReactiveTest
1313

1414
test('subscribe()', t => {
1515
const sh = TestScheduler.of()
16-
const {results} = sh.startScheduler<number>(() => interval(200))
16+
const {results} = sh.start<number>(() => interval(200))
1717
t.deepEqual(results, [
1818
next(400, 0),
1919
next(600, 1),

test/test.JoinObservable.ts

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,24 +10,24 @@ const {next, complete} = ReactiveTest
1010

1111
test('subscribe()', t => {
1212
const sh = TestScheduler.of()
13-
const sa$$ = sh.createColdObservable([
13+
const sa$$ = sh.Cold([
1414
next(10, 'A0'),
1515
next(20, 'A1'),
1616
next(30, 'A2'),
1717
complete(40)
1818
])
19-
const sb$$ = sh.createColdObservable([
19+
const sb$$ = sh.Cold([
2020
next(10, 'B0'),
2121
next(20, 'B1'),
2222
next(30, 'B2'),
2323
complete(40)
2424
])
25-
const s$$ = sh.createColdObservable([
25+
const s$$ = sh.Cold([
2626
next(10, sa$$),
2727
next(20, sb$$),
2828
complete(100)
2929
])
30-
const {results} = sh.startScheduler<number>(() => join(s$$))
30+
const {results} = sh.start<number>(() => join(s$$))
3131

3232
t.deepEqual(results, [
3333
next(220, 'A0'),
@@ -39,3 +39,39 @@ test('subscribe()', t => {
3939
complete(300)
4040
])
4141
})
42+
43+
44+
test('subscribe():hot', t => {
45+
const sh = TestScheduler.of()
46+
47+
// start(210)
48+
const sa$$ = sh.Hot([
49+
next(211, 'A0'),
50+
next(220, 'A1'),
51+
next(230, 'A2'),
52+
complete(240)
53+
])
54+
55+
// start(220)
56+
const sb$$ = sh.Hot([
57+
next(205, 'B0'),
58+
next(215, 'B1'),
59+
next(225, 'B2'),
60+
complete(340)
61+
])
62+
63+
// start(210)
64+
const s$$ = sh.Hot([
65+
next(210, sa$$),
66+
next(220, sb$$),
67+
complete(300)
68+
])
69+
const {results} = sh.start<number>(() => join(s$$))
70+
t.deepEqual(results, [
71+
next(211, 'A0'),
72+
next(220, 'A1'),
73+
next(225, 'B2'),
74+
next(230, 'A2'),
75+
complete(340)
76+
])
77+
})

test/test.MapObservable.ts

Lines changed: 20 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,17 +13,35 @@ const {next, complete} = ReactiveTest
1313

1414
test('MapObservable.subscribe()', t => {
1515
const sh = TestScheduler.of()
16-
const $ = sh.createColdObservable<number>([
16+
const $ = sh.Cold<number>([
1717
next(210, 0),
1818
next(220, 10),
1919
next(230, 20),
2020
complete(250)
2121
])
22-
const {results} = sh.startScheduler(() => map<number>((x: number) => x + 1, $))
22+
const {results} = sh.start(() => map<number>((x: number) => x + 1, $))
2323
t.deepEqual(results, [
2424
next(410, 1),
2525
next(420, 11),
2626
next(430, 21),
2727
complete(450)
2828
])
2929
})
30+
31+
test('MapObservable.subscribe():HOT', t => {
32+
const sh = TestScheduler.of()
33+
const $ = sh.Hot<number>([
34+
next(100, -10),
35+
next(210, 0),
36+
next(220, 10),
37+
next(230, 20),
38+
complete(250)
39+
])
40+
const {results} = sh.start(() => map<number>((x: number) => x + 1, $))
41+
t.deepEqual(results, [
42+
next(210, 1),
43+
next(220, 11),
44+
next(230, 21),
45+
complete(250)
46+
])
47+
})

test/test.ScanObservable.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@ const {next, complete} = ReactiveTest
1111

1212
test('ScanObservable.subscribe()', t => {
1313
const sh = TestScheduler.of()
14-
const $ = sh.createColdObservable<number>([
14+
const $ = sh.Cold<number>([
1515
next(210, 0),
1616
next(220, 1),
1717
next(230, 2),
1818
next(240, 3),
1919
next(250, 4),
2020
complete(250)
2121
])
22-
const {results} = sh.startScheduler(() => scan((a, b) => a + b, 0, $))
22+
const {results} = sh.start(() => scan((a, b) => a + b, 0, $))
2323
t.deepEqual(results, [
2424
next(410, 0),
2525
next(420, 1),

test/test.SliceObservable.ts

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ import {ReactiveTest} from '../src/testing/ReactiveTest';
1212
const {next, complete} = ReactiveTest
1313
test('takeN(0, 3)', t => {
1414
const sh = TestScheduler.of()
15-
const ob$ = sh.createColdObservable([
15+
const ob$ = sh.Cold([
1616
next(0, 1),
1717
next(10, 2),
1818
next(20, 3),
1919
next(30, 4),
2020
next(40, 5),
2121
complete(50)
2222
])
23-
const {results} = sh.startScheduler(() => slice(0, 3, ob$))
23+
const {results} = sh.start(() => slice(0, 3, ob$))
2424
t.deepEqual(results, [
2525
next(200, 1),
2626
next(210, 2),
@@ -31,7 +31,7 @@ test('takeN(0, 3)', t => {
3131

3232
test('takeN(0, Infinity)', t => {
3333
const sh = TestScheduler.of()
34-
const ob$ = sh.createColdObservable([
34+
const ob$ = sh.Cold([
3535
next(0, 1),
3636
next(10, 2),
3737
next(20, 3),
@@ -40,7 +40,7 @@ test('takeN(0, Infinity)', t => {
4040
complete(50),
4141
next(60, 6)
4242
])
43-
const {results} = sh.startScheduler(() => slice(0, Infinity, ob$))
43+
const {results} = sh.start(() => slice(0, Infinity, ob$))
4444
t.deepEqual(results, [
4545
next(200, 1),
4646
next(210, 2),
@@ -53,15 +53,15 @@ test('takeN(0, Infinity)', t => {
5353

5454
test('takeN(1, 3)', t => {
5555
const sh = TestScheduler.of()
56-
const ob$ = sh.createColdObservable([
56+
const ob$ = sh.Cold([
5757
next(0, 1),
5858
next(10, 2),
5959
next(20, 3),
6060
next(30, 4),
6161
next(40, 5),
6262
complete(50)
6363
])
64-
const {results} = sh.startScheduler(() => slice(1, 3, ob$))
64+
const {results} = sh.start(() => slice(1, 3, ob$))
6565
t.deepEqual(results, [
6666
next(210, 2),
6767
next(220, 3),

0 commit comments

Comments
 (0)