Skip to content

Commit 4b695d3

Browse files
authored
Merge pull request #223 from tusharmath/concurrent-stream
Control concurrency using an observable in mergeMap( )
2 parents 268b8ee + 0721de7 commit 4b695d3

File tree

10 files changed

+194
-91
lines changed

10 files changed

+194
-91
lines changed

API.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -418,7 +418,7 @@ function mergeMap(concurrency: number,project: (s) => Observable,source: Observa
418418
**Example:**
419419
```ts
420420
const $ = O.mergeMap(
421-
1,
421+
O.just(1),
422422
(ev) => O.slice(0, 3, O.interval(100)),
423423
O.fromEvent(document, 'click')
424424
)

benchmarks/bm.mergeMap.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
import {Suite} from 'benchmark'
66
import {mergeMap} from '../src/operators/MergeMap'
7+
import {just} from '../src/sources/Create'
78
import {fromArray} from '../src/sources/FromArray'
89
import {array, IDeferred, run} from './lib'
910

@@ -12,7 +13,7 @@ const a = array(1e3).map(i => array(1e3))
1213
export function bm_mergeMap(suite: Suite) {
1314
return suite.add(
1415
'file -> mergeMap',
15-
(d: IDeferred) => run(mergeMap(1e2, fromArray, fromArray(a)), d),
16+
(d: IDeferred) => run(mergeMap(just(1e2), fromArray, fromArray(a)), d),
1617
{defer: true}
1718
)
1819
}

extra.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export class Air<T> implements IObservable<T> {
5050
map<S>(fn: (t: T) => S) {
5151
return new Air(O.map(fn, this))
5252
}
53-
mergeMap<S>(n: number, project: (t: T) => IObservable<S>) {
53+
mergeMap<S>(n: IObservable<number>, project: (t: T) => IObservable<S>) {
5454
return new Air(O.mergeMap(n, project, this))
5555
}
5656
multicast() {

src/lib/Subscription.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,14 @@ export class CompositeSubscription implements ISubscription {
3333
this.subscriptions = new LinkedList<ISubscription>()
3434
}
3535

36+
head() {
37+
return this.subscriptions.head()
38+
}
39+
40+
tail() {
41+
return this.subscriptions.tail()
42+
}
43+
3644
add(d: ISubscription) {
3745
return this.subscriptions.add(d)
3846
}

src/operators/MergeMap.ts

Lines changed: 79 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,42 @@ import {IObserver} from '../lib/Observer'
99
import {IScheduler} from '../lib/Scheduler'
1010
import {CompositeSubscription, ISubscription} from '../lib/Subscription'
1111
import {curry} from '../lib/Utils'
12+
import {just} from '../sources/Create'
1213

1314
export type Project<T, S> = (t: T) => IObservable<S>
1415

16+
// prettier-ignore
17+
export type mergeMapFunctionWithConcurrency = {
18+
<T, S>(concurrency: IObservable<number>, project: Project<T, S>, source: IObservable<T>): IObservable<S>
19+
<T, S>(concurrency: IObservable<number>): {(project: Project<T, S>, source: IObservable<T>): IObservable<S>}
20+
<T, S>(concurrency: IObservable<number>): {(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}}
21+
}
22+
23+
// prettier-ignore
24+
export type mergeMapFunction = {
25+
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
26+
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
27+
<T, S>(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}
28+
}
29+
// prettier-ignore
30+
export type joinFunction = <T>(source: IObservable<IObservable<T>>) => IObservable<T>
31+
32+
class ConcurrencyObserver extends ErrorMixin(Virgin)
33+
implements IObserver<number> {
34+
constructor(
35+
private outer: MergeMapOuterObserver<any, any>,
36+
readonly sink: IObserver<any>
37+
) {
38+
super()
39+
}
40+
41+
next(val: number): void {
42+
this.outer.setConc(val)
43+
}
44+
45+
complete(): void {}
46+
}
47+
1548
class MergeMapInnerObserver<T, S> implements IObserver<S> {
1649
node: LinkedListNode<ISubscription>
1750

@@ -39,27 +72,43 @@ class MergeMapOuterObserver<T, S> extends ErrorMixin(Virgin)
3972
implements IObserver<T> {
4073
private __completed: boolean = false
4174
private __buffer: T[] = []
75+
private __conc = 0
76+
readonly cSub = new CompositeSubscription()
4277

4378
constructor(
44-
readonly conc: number,
79+
conc: IObservable<number>,
4580
readonly proj: Project<T, S>,
4681
readonly sink: IObserver<S>,
47-
readonly cSub: CompositeSubscription,
4882
readonly sh: IScheduler
4983
) {
5084
super()
85+
conc.subscribe(new ConcurrencyObserver(this, sink), sh)
86+
}
87+
88+
private getSpace() {
89+
return this.__conc - this.cSub.length()
90+
}
91+
92+
private subscribeNew() {
93+
while (this.__buffer.length > 0 && this.getSpace() > 0) {
94+
this.next(this.__buffer.shift() as T)
95+
}
96+
}
97+
98+
private unsubscribeOld() {
99+
while (this.getSpace() < 0) {
100+
this.cSub.remove(this.cSub.head())
101+
}
51102
}
52103

53104
next(val: T): void {
54-
if (this.cSub.length() < this.conc + 1) {
105+
if (this.getSpace() > 0) {
55106
const innerObserver = new MergeMapInnerObserver(this)
56107
const node = this.cSub.add(
57108
this.proj(val).subscribe(innerObserver, this.sh)
58109
)
59110
innerObserver.setup(node)
60-
} else {
61-
this.__buffer.push(val)
62-
}
111+
} else this.__buffer.push(val)
63112
}
64113

65114
complete(): void {
@@ -68,61 +117,50 @@ class MergeMapOuterObserver<T, S> extends ErrorMixin(Virgin)
68117
}
69118

70119
checkComplete() {
71-
if (this.__completed && this.cSub.length() === 1) this.sink.complete()
72-
else if (this.__buffer.length > 0) {
73-
if (this.__buffer.length > 0) {
74-
this.next(this.__buffer.shift() as T)
75-
}
76-
}
120+
if (
121+
this.__completed &&
122+
this.cSub.length() === 0 &&
123+
this.__buffer.length === 0
124+
)
125+
this.sink.complete()
126+
else this.subscribeNew()
127+
}
128+
129+
setConc(value: number) {
130+
this.__conc = value
131+
const space = this.getSpace()
132+
if (space > 0) this.subscribeNew()
133+
else if (space < 0) this.unsubscribeOld()
77134
}
78135
}
79136

80137
class MergeMap<T, S> implements IObservable<S> {
81138
constructor(
82-
private conc: number,
139+
private conc: IObservable<number>,
83140
private proj: Project<T, S>,
84141
private src: IObservable<T>
85142
) {}
86143

87144
subscribe(observer: IObserver<S>, scheduler: IScheduler): ISubscription {
88145
const cSub = new CompositeSubscription()
89-
const outerObserver = new MergeMapOuterObserver<T, S>(
90-
this.conc,
91-
this.proj,
92-
observer,
93-
cSub,
94-
scheduler
95-
)
146+
147+
// prettier-ignore
148+
const outerObserver = new MergeMapOuterObserver<T, S>(this.conc, this.proj, observer, scheduler)
149+
cSub.add(outerObserver.cSub)
96150
cSub.add(this.src.subscribe(outerObserver, scheduler))
97151
return cSub
98152
}
99153
}
100154

101-
// prettier-ignore
102-
export type mergeMapFunctionWithConcurrency = {
103-
<T, S>(concurrency: number, project: Project<T, S>, source: IObservable<T>): IObservable<S>
104-
<T, S>(concurrency: number): {(project: Project<T, S>, source: IObservable<T>): IObservable<S>}
105-
<T, S>(concurrency: number): {(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}}
106-
}
107-
108-
// prettier-ignore
109-
export type mergeMapFunction = {
110-
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
111-
<T, S>(project: Project<T, S>, source: IObservable<T>): IObservable<S>
112-
<T, S>(project: Project<T, S>): {(source: IObservable<T>): IObservable<S>}
113-
}
114-
115-
export type joinFunction = <T>(
116-
source: IObservable<IObservable<T>>
117-
) => IObservable<T>
118-
119155
export const mergeMap: mergeMapFunctionWithConcurrency = curry(
120156
<T, S>(
121-
concurrency: number,
157+
concurrency: IObservable<number>,
122158
project: Project<T, S>,
123159
source: IObservable<T>
124160
): IObservable<S> => new MergeMap(concurrency, project, source)
125161
)
126-
export const flatMap = mergeMap(Number.POSITIVE_INFINITY) as mergeMapFunction
127-
export const concatMap: mergeMapFunction = mergeMap(1) as mergeMapFunction
162+
export const flatMap = mergeMap(
163+
just(Number.POSITIVE_INFINITY)
164+
) as mergeMapFunction
165+
export const concatMap: mergeMapFunction = mergeMap(just(1)) as mergeMapFunction
128166
export const join = flatMap(i => i) as joinFunction

src/testing/TestObservable.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ export class TestObservable<T> implements IObservable<T> {
1313

1414
constructor(private func: (observer: IObserver<T>) => ISubscription) {}
1515

16-
get marble() {
16+
toString() {
1717
return toMarble(this.subscriptions)
1818
}
1919

src/testing/TestObserver.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import {toMarble} from './Marble'
99
export class TestObserver<T> implements IObserver<T> {
1010
results: Array<IObservableEvent>
1111

12-
get marble() {
12+
toString() {
1313
return toMarble(this.results)
1414
}
1515

test/test.ForEach.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ describe('forEach()', () => {
2828
'#'
2929
)
3030

31-
const actual = testObservable.marble
31+
const actual = testObservable.toString()
3232
assert.strictEqual(actual, expected)
3333
})
3434
})
@@ -42,7 +42,7 @@ describe('forEach()', () => {
4242

4343
sh.startSubscription(() => forEach(testObserver, $))
4444

45-
const actual = testObserver.marble
45+
const actual = testObserver.toString()
4646
const expected = '-1234|'
4747

4848
assert.strictEqual(actual, expected)
@@ -56,7 +56,7 @@ describe('forEach()', () => {
5656

5757
sh.startSubscription(() => forEach(testObserver, testObservable, sh))
5858

59-
const actual = testObserver.marble
59+
const actual = testObserver.toString()
6060

6161
assert.strictEqual(actual, expected)
6262
})

0 commit comments

Comments
 (0)