Skip to content

Commit 9e5ec8d

Browse files
authored
Merge pull request #230 from tusharmath/support-subject
Support subject
2 parents dec4dc3 + e74a6d7 commit 9e5ec8d

File tree

5 files changed

+149
-1
lines changed

5 files changed

+149
-1
lines changed

API.md

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,12 @@
1414
- [frames](#frames)
1515
- [fromArray](#fromArray)
1616
- [fromDOM](#fromDOM)
17+
- [fromNodeStream](#fromNodeStream)
1718
- [fromPromise](#fromPromise)
1819
- [interval](#interval)
1920
- [just](#just)
2021
- [never](#never)
21-
- [fromNodeStream](#fromNodeStream)
22+
- [subject](#subject)
2223

2324
- [Operators](#operators)
2425
- [combine](#combine)
@@ -333,6 +334,23 @@ O.forEach(console.log, $) // logs `AIR`
333334
function never(): Observable
334335
```
335336

337+
## subject
338+
339+
```ts
340+
function subject(): Observable & Observer
341+
```
342+
`Subject` is a special type that is both and `Observer` and also an `Observable`.
343+
344+
**Example:**
345+
```ts
346+
const $ = O.subject()
347+
348+
O.forEach(console.log, $) // logs A
349+
350+
$.next('A')
351+
352+
```
353+
336354
## fromNodeStream
337355

338356
```ts
@@ -694,6 +712,9 @@ interface TestScheduler extends Scheduler {
694712
695713
// Creates a TestObserver. TestObserver keeps log of when and what type of an event was fired.
696714
Observer (): Observer
715+
716+
// Schedules multiple jobs using an array of tasks
717+
timeline (tasks: [[Number, () => void]]): void
697718
}
698719
```
699720

src/main.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export {sample} from './operators/Sample'
3535
export {scan} from './operators/Scan'
3636
export {skipRepeats} from './operators/SkipRepeats'
3737
export {slice} from './operators/Slice'
38+
export {subject} from './sources/Subject'
3839
export {switchLatest} from './operators/Switch'
3940
export {switchMap} from './operators/Switch'
4041
export {tap} from './operators/Map'

src/sources/Subject.ts

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/**
2+
* Created by tushar on 11/01/18.
3+
*/
4+
import {LinkedListNode} from '../lib/LinkedList'
5+
import {IObservable} from '../lib/Observable'
6+
import {CompositeObserver, IObserver} from '../lib/Observer'
7+
import {IScheduler} from '../lib/Scheduler'
8+
import {ISubscription} from '../lib/Subscription'
9+
10+
class SubjectSubscription<T> implements ISubscription {
11+
constructor(
12+
private cObserver: CompositeObserver<T>,
13+
private node: LinkedListNode<IObserver<T>>
14+
) {}
15+
closed: boolean
16+
17+
unsubscribe(): void {
18+
this.cObserver.remove(this.node)
19+
}
20+
}
21+
22+
export class Subject<T> extends CompositeObserver<T> implements IObservable<T> {
23+
subscribe(observer: IObserver<T>, scheduler: IScheduler): ISubscription {
24+
return new SubjectSubscription(this, this.add(observer))
25+
}
26+
}
27+
export const subject = <T>() => new Subject<T>()

src/testing/TestScheduler.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,10 @@ export class TestScheduler implements IScheduler {
164164
Observer<T>() {
165165
return new TestObserver<T>(this)
166166
}
167+
168+
timeline(tasks: Array<[number, () => void]>) {
169+
tasks.forEach(([time, task]) => this.delay(task, time))
170+
}
167171
}
168172

169173
export const createTestScheduler = (

test/test.Subject.ts

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
/**
2+
* Created by tushar on 11/01/18.
3+
*/
4+
import * as assert from 'assert'
5+
import {ISubscription} from '../src/lib/Subscription'
6+
import {subject} from '../src/sources/Subject'
7+
import {EVENT} from '../src/testing/Events'
8+
import {createTestScheduler} from '../src/testing/TestScheduler'
9+
10+
describe('subject()', () => {
11+
it('should be an observable+observer', () => {
12+
const sh = createTestScheduler()
13+
const ob = sh.Observer()
14+
const $ = subject()
15+
let sub: ISubscription
16+
17+
// Timeline
18+
sh.timeline([
19+
[210, () => $.next('A')],
20+
[220, () => (sub = $.subscribe(ob, sh))],
21+
[230, () => $.next('B')],
22+
[240, () => $.next('C')],
23+
[250, () => sub.unsubscribe()],
24+
[260, () => $.next('D')],
25+
[270, () => $.complete()]
26+
])
27+
28+
sh.advanceTo(211)
29+
assert.deepEqual(ob.results, []) // nothing gets captured
30+
31+
sh.advanceTo(241)
32+
assert.deepEqual(ob.results, [EVENT.next(230, 'B'), EVENT.next(240, 'C')])
33+
34+
sh.advanceTo(261)
35+
assert.deepEqual(ob.results, [EVENT.next(230, 'B'), EVENT.next(240, 'C')])
36+
37+
sh.advanceTo(271)
38+
assert.deepEqual(ob.results, [EVENT.next(230, 'B'), EVENT.next(240, 'C')])
39+
})
40+
41+
it('should be an observable+observer', () => {
42+
const sh = createTestScheduler()
43+
const ob0 = sh.Observer()
44+
const ob1 = sh.Observer()
45+
const $ = subject()
46+
let sb1: ISubscription
47+
let sb0: ISubscription
48+
49+
// Timeline
50+
sh.timeline([
51+
[210, () => $.next('A')],
52+
// -- 211
53+
[220, () => (sb0 = $.subscribe(ob0, sh))],
54+
[230, () => $.next('B')],
55+
// -- 231
56+
[240, () => (sb1 = $.subscribe(ob1, sh))],
57+
[250, () => $.next('C')],
58+
// -- 251
59+
[260, () => sb1.unsubscribe()],
60+
[270, () => $.next('D')],
61+
// -- 271
62+
[280, () => sb0.unsubscribe()],
63+
[290, () => $.next('E')]
64+
// -- 291
65+
])
66+
67+
sh.advanceTo(211)
68+
assert.deepEqual(ob0.results, [])
69+
assert.deepEqual(ob1.results, [])
70+
71+
sh.advanceTo(231)
72+
assert.deepEqual(ob0.results, [EVENT.next(230, 'B')])
73+
assert.deepEqual(ob1.results, [])
74+
75+
sh.advanceTo(251)
76+
assert.deepEqual(ob0.results, [EVENT.next(230, 'B'), EVENT.next(250, 'C')])
77+
assert.deepEqual(ob1.results, [EVENT.next(250, 'C')])
78+
79+
sh.advanceTo(271)
80+
assert.deepEqual(ob0.results, [
81+
EVENT.next(230, 'B'),
82+
EVENT.next(250, 'C'),
83+
EVENT.next(270, 'D')
84+
])
85+
assert.deepEqual(ob1.results, [EVENT.next(250, 'C')])
86+
87+
sh.advanceTo(291)
88+
assert.deepEqual(ob0.results, [
89+
EVENT.next(230, 'B'),
90+
EVENT.next(250, 'C'),
91+
EVENT.next(270, 'D')
92+
])
93+
assert.deepEqual(ob1.results, [EVENT.next(250, 'C')])
94+
})
95+
})

0 commit comments

Comments
 (0)