Skip to content

Commit 2ae171b

Browse files
authored
Merge pull request #238 from tusharmath/frames-scheduler
Frames scheduler
2 parents 326d61c + ff1d9d0 commit 2ae171b

16 files changed

+132
-61
lines changed

.prettierrc

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
{
22
"printWidth": 80,
33
"semi": false,
4-
"singleQuote": true
4+
"singleQuote": true,
5+
"bracketSpacing": false
56
}

API.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -549,8 +549,8 @@ Takes in multiple sources and a sample source and returns a new observable which
549549

550550
```ts
551551
O.sample(
552-
O.interval(1000),
553552
(a, b) => [a, b],
553+
O.interval(1000),
554554
[
555555
O.mapTo('A', O.interval(100)),
556556
O.mapTo('B', O.interval(200))

benchmarks/bm.concat.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ export function bm_concat(suite: Suite) {
1313
return suite.add(
1414
'(file, file) -> concat',
1515
(d: IDeferred) => run(concat(fromArray(a), fromArray(b)), d),
16-
{defer: true}
16+
{
17+
defer: true
18+
}
1719
)
1820
}

benchmarks/bm.mergeMap.ts

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@ export function bm_mergeMap(suite: Suite) {
1414
return suite.add(
1515
'file -> mergeMap',
1616
(d: IDeferred) => run(mergeMap(just(1e2), fromArray, fromArray(a)), d),
17-
{defer: true}
17+
{
18+
defer: true
19+
}
1820
)
1921
}

package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
"build": "rollup -c ./config/rollup.config.js",
1616
"lint": "git ls-files | grep '.ts$' | xargs tslint",
1717
"lint:fix": "git ls-files | grep '.ts$' | xargs tslint --fix",
18-
"prettier": "git ls-files | grep '.ts$' | xargs prettier --print-width 80 --write --single-quote --no-semi --no-bracket-spacing",
18+
"prettier": "git ls-files | grep '.ts$' | xargs prettier --write --config=.prettierrc",
1919
"travis-deploy-once": "travis-deploy-once",
2020
"semantic-release": "semantic-release"
2121
},

src/lib/Periodic.ts

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
import {ISafeObserver} from './SafeObserver'
2+
import {ISubscription} from './Subscription'
3+
4+
export class Periodic implements ISubscription {
5+
protected sub: ISubscription
6+
protected sink: ISafeObserver<void>
7+
8+
onEvent() {
9+
this.sink.next(undefined)
10+
if (this.sink.erred) this.unsubscribe()
11+
}
12+
13+
unsubscribe(): void {
14+
this.sub.unsubscribe()
15+
}
16+
17+
get closed() {
18+
return this.sub.closed
19+
}
20+
}

src/lib/SafeObserver.ts

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,13 @@ import {CompleteMixin, ErrorMixin, Virgin} from './Mixins'
55
import {IObserver} from './Observer'
66
import {ISafeFunction, tryCatch} from './Utils'
77

8+
export interface ISafeObserver<T> extends IObserver<T> {
9+
erred: boolean
10+
}
811
class SafeObserver<T> extends ErrorMixin(CompleteMixin(Virgin))
912
implements IObserver<T> {
1013
private _next: ISafeFunction<void, IObserver<T>>
14+
erred = false
1115

1216
constructor(public sink: IObserver<T>) {
1317
super()
@@ -16,17 +20,14 @@ class SafeObserver<T> extends ErrorMixin(CompleteMixin(Virgin))
1620

1721
next(val: T): void {
1822
const r = this._next.call(this.sink, val)
19-
if (r.isError()) this.sink.error(r.getError())
23+
if (r.isError) this.error(r.getError())
2024
}
2125

2226
error(err: Error): void {
27+
this.erred = true
2328
this.sink.error(err)
2429
}
25-
26-
complete(): void {
27-
this.sink.complete()
28-
}
2930
}
3031

31-
export const safeObserver = <T>(observer: IObserver<T>): IObserver<T> =>
32+
export const safeObserver = <T>(observer: IObserver<T>): ISafeObserver<T> =>
3233
observer instanceof SafeObserver ? observer : new SafeObserver(observer)

src/lib/Scheduler.ts

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,9 +5,15 @@ import {ISubscription} from './Subscription'
55

66
export interface IScheduler {
77
delay(task: () => void, relativeTime: number): ISubscription
8+
89
periodic(task: () => void, interval: number): ISubscription
10+
911
frame(task: () => void): ISubscription
12+
13+
frames(task: () => void): ISubscription
14+
1015
asap(task: () => void): ISubscription
16+
1117
now(): number
1218
}
1319

@@ -33,6 +39,7 @@ class Periodic implements ISubscription {
3339
clearInterval(this.id)
3440
}
3541
}
42+
3643
class Delay implements ISubscription {
3744
closed = false
3845

@@ -52,6 +59,7 @@ class Delay implements ISubscription {
5259
}
5360
}
5461
}
62+
5563
class ASAP implements ISubscription {
5664
closed = false
5765

@@ -69,7 +77,8 @@ class ASAP implements ISubscription {
6977
if (this.closed === false) this.closed = true
7078
}
7179
}
72-
class Frames implements ISubscription {
80+
81+
class Frame implements ISubscription {
7382
closed = false
7483
private frame: number
7584

@@ -89,8 +98,34 @@ class Frames implements ISubscription {
8998
cancelAnimationFrame(this.frame)
9099
}
91100
}
101+
102+
class Frames implements ISubscription {
103+
closed = false
104+
private frame: number
105+
106+
constructor(private task: () => void) {
107+
this.onEvent = this.onEvent.bind(this)
108+
this.frame = requestAnimationFrame(this.onEvent)
109+
}
110+
111+
onEvent() {
112+
this.task()
113+
this.frame = requestAnimationFrame(this.onEvent)
114+
}
115+
116+
unsubscribe(): void {
117+
if (this.closed) return
118+
this.closed = true
119+
cancelAnimationFrame(this.frame)
120+
}
121+
}
122+
92123
class Scheduler implements IScheduler {
93124
frame(task: () => void): ISubscription {
125+
return new Frame(task)
126+
}
127+
128+
frames(task: () => void): ISubscription {
94129
return new Frames(task)
95130
}
96131

@@ -110,4 +145,5 @@ class Scheduler implements IScheduler {
110145
return Date.now()
111146
}
112147
}
148+
113149
export const createScheduler = (): IScheduler => new Scheduler()

src/lib/Utils.ts

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ export function curry(f: Function, l: number = f.length): any {
1212
}
1313

1414
export interface ISafeValue<T> {
15-
isError(): boolean
15+
isError: boolean
1616
getValue(): T
1717
getError(): Error
1818
}
@@ -22,11 +22,7 @@ export interface ISafeFunction<V, C> {
2222
}
2323

2424
class Guarded<T> implements ISafeValue<T> {
25-
constructor(private value: Error | T) {}
26-
27-
isError(): boolean {
28-
return this.value instanceof Error
29-
}
25+
constructor(private value: Error | T, readonly isError: boolean) {}
3026

3127
getValue() {
3228
return this.value as T
@@ -43,9 +39,9 @@ class BaseSafeFunction<T extends Function, V, C>
4339

4440
call(ctx: C, ...t: any[]): ISafeValue<V> {
4541
try {
46-
return new Guarded(this.f.apply(ctx, t))
42+
return new Guarded(this.f.apply(ctx, t), false)
4743
} catch (e) {
48-
return new Guarded(e)
44+
return new Guarded(e, true)
4945
}
5046
}
5147
}

src/sources/Frames.ts

Lines changed: 7 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,31 +3,15 @@
33
*/
44
import {IObservable} from '../lib/Observable'
55
import {IObserver} from '../lib/Observer'
6-
import {safeObserver} from '../lib/SafeObserver'
6+
import {Periodic} from '../lib/Periodic'
7+
import {ISafeObserver, safeObserver} from '../lib/SafeObserver'
78
import {IScheduler} from '../lib/Scheduler'
89
import {ISubscription} from '../lib/Subscription'
910

10-
class RAFSubscription implements ISubscription {
11-
observer: IObserver<number>
12-
subscription: ISubscription
13-
closed = false
14-
15-
constructor(private sink: IObserver<void>, private scheduler: IScheduler) {
16-
this.schedule()
17-
}
18-
19-
private schedule() {
20-
this.subscription = this.scheduler.frame(this.onFrame)
21-
}
22-
23-
onFrame = () => {
24-
if (this.closed) return
25-
this.sink.next(undefined)
26-
this.schedule()
27-
}
28-
29-
unsubscribe(): void {
30-
this.closed = true
11+
class RAFSubscription extends Periodic {
12+
constructor(readonly sink: ISafeObserver<void>, scheduler: IScheduler) {
13+
super()
14+
this.sub = scheduler.frames(this.onEvent.bind(this))
3115
}
3216
}
3317

@@ -36,6 +20,7 @@ class FrameObservable implements IObservable<void> {
3620
return new RAFSubscription(safeObserver(observer), scheduler)
3721
}
3822
}
23+
3924
export function frames(): IObservable<void> {
4025
return new FrameObservable()
4126
}

src/sources/FromArray.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,10 @@ class FromArraySubscription<T> implements ISubscription {
1919
this.subscription = scheduler.asap(this.executeSafely.bind(this))
2020
}
2121

22+
// TODO: use mixins
2223
private executeSafely() {
2324
const r = tryCatch(this.execute).call(this)
24-
if (r.isError()) this.sink.error(r.getError())
25+
if (r.isError) this.sink.error(r.getError())
2526
}
2627

2728
execute() {

src/sources/Interval.ts

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -3,29 +3,19 @@
33
*/
44
import {IObservable} from '../lib/Observable'
55
import {IObserver} from '../lib/Observer'
6-
import {safeObserver} from '../lib/SafeObserver'
6+
import {Periodic} from '../lib/Periodic'
7+
import {ISafeObserver, safeObserver} from '../lib/SafeObserver'
78
import {IScheduler} from '../lib/Scheduler'
89
import {ISubscription} from '../lib/Subscription'
910

10-
class TimerSubscription implements ISubscription {
11-
closed = false
12-
subscription: ISubscription
13-
11+
class TimerSubscription extends Periodic {
1412
constructor(
15-
private sink: IObserver<void>,
13+
readonly sink: ISafeObserver<void>,
1614
scheduler: IScheduler,
1715
interval: number
1816
) {
19-
this.subscription = scheduler.periodic(this.onFrame.bind(this), interval)
20-
}
21-
22-
onFrame() {
23-
this.sink.next(undefined)
24-
}
25-
26-
unsubscribe(): void {
27-
this.closed = true
28-
this.subscription.unsubscribe()
17+
super()
18+
this.sub = scheduler.periodic(this.onEvent.bind(this), interval)
2919
}
3020
}
3121

src/testing/TestScheduler.ts

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,10 @@ export class TestScheduler implements IScheduler {
8888
return this.delay(task, this.now() + this.rafTimeout, 0)
8989
}
9090

91+
frames(task: () => void): ISubscription {
92+
return this.periodic(task, this.rafTimeout)
93+
}
94+
9195
periodic(task: () => void, interval: number): ISubscription {
9296
var closed = false
9397
const repeatedTask = () => {
@@ -97,7 +101,9 @@ export class TestScheduler implements IScheduler {
97101
}
98102
this.delay(repeatedTask, interval)
99103
return {
100-
closed,
104+
get closed() {
105+
return closed
106+
},
101107
unsubscribe() {
102108
closed = true
103109
}

test/test.Frames.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,22 @@ import {scan} from '../src/operators/Scan'
66
import {frames} from '../src/sources/Frames'
77
import {toMarble} from '../src/testing/Marble'
88
import {createTestScheduler} from '../src/testing/TestScheduler'
9+
import {throwError} from '../src/testing/Thrower'
910

1011
describe('frames()', () => {
1112
it('should emit requestAnimationFrame events', () => {
1213
const sh = createTestScheduler(10)
1314
const {results} = sh.start(() => scan(i => i + 1, -1, frames()), 200, 250)
1415
t.strictEqual(toMarble(results), '-0123')
1516
})
17+
18+
it('should capture internal errors', () => {
19+
const sh = createTestScheduler(20)
20+
const reduce = (i: number) => {
21+
if (i === 5) throwError('Yo Air')
22+
return i + 1
23+
}
24+
const {results} = sh.start(() => scan(reduce, -1, frames()), 200, 500)
25+
t.strictEqual(toMarble(results), '--0-1-2-3-4-5-#')
26+
})
1627
})

test/test.FromArray.ts

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,11 +12,22 @@ import {ERROR_MESSAGE, throwError} from '../src/testing/Thrower'
1212
const {next, error} = EVENT
1313

1414
describe('fromArray()', () => {
15+
const throwMessage = (message: string) => {
16+
throw message
17+
}
1518
it('should emit array values as events', () => {
1619
const sh = createTestScheduler()
1720
const testFunction = (x: any) =>
1821
x === 2 ? throwError(ERROR_MESSAGE) : x * 100
1922
const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3])))
2023
t.deepEqual(results, [next(201, 100), error(201, new Error(ERROR_MESSAGE))])
2124
})
25+
26+
it('should handle thrown non Error exceptions', () => {
27+
const sh = createTestScheduler()
28+
const testFunction = (x: any) =>
29+
x === 2 ? throwMessage(ERROR_MESSAGE) : x * 100
30+
const {results} = sh.start(() => map(testFunction, fromArray([1, 2, 3])))
31+
t.deepEqual(results, [next(201, 100), error(201, ERROR_MESSAGE as any)])
32+
})
2233
})

0 commit comments

Comments
 (0)