Skip to content
This repository was archived by the owner on May 14, 2025. It is now read-only.

Commit 576a888

Browse files
BoykoAlexilayaperumalg
authored andcommitted
Unbound stream applications
- DSL client side parser supports comma separated app list. - Client side tokenization/parsing extended to allow extra wildcards in destination names. - Tests added for new features and adjusted existing tests. App List: add type App in the filter App type: review color label
1 parent 7b69fad commit 576a888

19 files changed

+628
-157
lines changed

ui/src/app/apps/apps.service.spec.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ describe('AppsService', () => {
9393

9494
const httpUri = this.mockHttp.delete.calls.mostRecent().args[0];
9595
const headerArgs = this.mockHttp.delete.calls.mostRecent().args[1].headers;
96-
expect(httpUri).toEqual('/apps/0/blubba');
96+
expect(httpUri).toEqual('/apps/1/blubba');
9797
expect(headerArgs.get('Content-Type')).toEqual('application/json');
9898
expect(headerArgs.get('Accept')).toEqual('application/json');
9999

ui/src/app/apps/components/app-list-bar/app-list-bar.component.html

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
</a>
3535
<ul class="dropdown-menu dropdown-menu-right" *dropdownMenu="">
3636
<li [class.active]="form.type == '' || form.type == null"><a (click)="form.type = ''">All types</a></li>
37+
<li [class.active]="form.type == 'app'"><a (click)="form.type = 'app'">App</a></li>
3738
<li [class.active]="form.type == 'source'"><a (click)="form.type = 'source'">Source</a></li>
3839
<li [class.active]="form.type == 'processor'"><a (click)="form.type = 'processor'">Processor</a></li>
3940
<li [class.active]="form.type == 'sink'"><a (click)="form.type = 'sink'">Sink</a></li>

ui/src/app/apps/components/app-type/app-type.component.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,9 @@ export class AppTypeComponent implements AfterContentInit, DoCheck {
5252
this.label = this.application.type.toString().toUpperCase();
5353

5454
switch (this.label) {
55+
case 'APP':
56+
this.labelClass = 'app';
57+
break;
5558
case 'TASK':
5659
this.labelClass = 'danger';
5760
break;

ui/src/app/shared/model/application-type.ts

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,11 @@
55
*
66
*/
77
export enum ApplicationType {
8+
/**
9+
* An applicatioin type that can have a number of input and output channels
10+
*/
11+
app,
12+
813
/**
914
* An application type that appears in a stream, at first position.
1015
*/

ui/src/app/shared/services/parser.spec.ts

Lines changed: 100 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ describe('parser:', () => {
1515
expect(line.errors).toBeNull();
1616
node = line.nodes[0];
1717
expect(node.group).toEqual('UNKNOWN_0');
18-
expect(node.type).toEqual('source');
18+
expect(node.type).toEqual('app');
1919
expect(node.name).toEqual('time');
2020
expect(node.options.size).toEqual(0);
2121
expect(node.optionsranges.size).toEqual(0);
@@ -59,6 +59,34 @@ describe('parser:', () => {
5959
expectChannels(node, 'foo');
6060
});
6161

62+
it('channel input with extended character set', () => {
63+
parseResult = Parser.parse(':foo*/# > log', 'stream');
64+
expect(parseResult.lines.length).toEqual(1);
65+
line = parseResult.lines[0];
66+
nodes = parseResult.lines[0].nodes;
67+
expect(nodes.length).toEqual(1);
68+
node = nodes[0];
69+
expect(node.group).toEqual('UNKNOWN_0');
70+
expect(node.type).toEqual('sink');
71+
expect(node.name).toEqual('log');
72+
expectRange(node.range, 10, 0, 13, 0);
73+
expectChannels(node, 'foo*/#');
74+
});
75+
76+
it('channel input with extended character set 2', () => {
77+
parseResult = Parser.parse(':*/foo > :*bar/foo#', 'stream');
78+
expect(parseResult.lines.length).toEqual(1);
79+
line = parseResult.lines[0];
80+
nodes = parseResult.lines[0].nodes;
81+
expect(nodes.length).toEqual(1);
82+
node = nodes[0];
83+
expect(node.group).toEqual('UNKNOWN_0');
84+
expect(node.type).toEqual('processor');
85+
expect(node.name).toEqual('bridge');
86+
expectRange(node.range, 7, 0, 8, 0);
87+
expectChannels(node, '*/foo', '*bar/foo#');
88+
});
89+
6290
it('channel output', () => {
6391
parseResult = Parser.parse('time > :foo', 'stream');
6492
expect(parseResult.lines.length).toEqual(1);
@@ -73,6 +101,24 @@ describe('parser:', () => {
73101
expectChannels(node, null, 'foo');
74102
});
75103

104+
it('long running apps', () => {
105+
parseResult = Parser.parse('aaa, bbb', 'stream');
106+
expect(parseResult.lines.length).toEqual(1);
107+
line = parseResult.lines[0];
108+
nodes = parseResult.lines[0].nodes;
109+
expect(nodes.length).toEqual(2);
110+
node = nodes[0];
111+
expect(node.group).toEqual('UNKNOWN_0');
112+
expect(node.type).toEqual('app');
113+
expect(node.name).toEqual('aaa');
114+
expectRange(node.range, 0, 0, 3, 0);
115+
node = nodes[1];
116+
expect(node.group).toEqual('UNKNOWN_0');
117+
expect(node.type).toEqual('app');
118+
expect(node.name).toEqual('bbb');
119+
expectRange(node.range, 5, 0, 8, 0);
120+
});
121+
76122
it('options', () => {
77123
parseResult = Parser.parse('time --a=b --c=d | log --e=f', 'stream');
78124
expect(parseResult.lines.length).toEqual(1);
@@ -148,6 +194,40 @@ describe('parser:', () => {
148194
expectRange(error.range, 0, 0, 0, 0);
149195
});
150196

197+
it('list of apps - error checking', () => {
198+
parseResult = Parser.parse(':aaa > fff,bbb', 'stream');
199+
error = parseResult.lines[0].errors[0];
200+
expect(error.message).toEqual('Don\'t use comma with channels');
201+
expectRange(error.range, 10, 0, 11, 0);
202+
parseResult = Parser.parse('aaa,bbb > :zzz', 'stream');
203+
error = parseResult.lines[0].errors[0];
204+
expect(error.message).toEqual('Don\'t use comma with channels');
205+
expectRange(error.range, 3, 0, 4, 0);
206+
parseResult = Parser.parse('aaa | bbb, ccc', 'stream');
207+
error = parseResult.lines[0].errors[0];
208+
expect(error.message).toEqual('Don\'t mix pipe and comma');
209+
expectRange(error.range, 4, 0, 5, 0);
210+
parseResult = Parser.parse('aaa, bbb| ccc', 'stream');
211+
error = parseResult.lines[0].errors[0];
212+
expect(error.message).toEqual('Don\'t mix pipe and comma');
213+
expectRange(error.range, 3, 0, 4, 0);
214+
215+
parseResult = Parser.parse('aaa | filter --expression=\'#jsonPath(payload,\'\'$.lang\'\')==\'\'en\'\'\'', 'stream');
216+
console.log(parseResult);
217+
expect(parseResult.lines[0].nodes[1].options.get('expression')).
218+
toEqual('\'#jsonPath(payload,\'\'$.lang\'\')==\'\'en\'\'\'');
219+
220+
parseResult = Parser.parse('aaa --bbb=ccc,', 'stream');
221+
error = parseResult.lines[0].errors[0];
222+
expect(error.message).toEqual('Out of data');
223+
expectRange(error.range, 14, 0, 15, 0);
224+
225+
parseResult = Parser.parse('aaa --bbb=\'ccc\',', 'stream');
226+
error = parseResult.lines[0].errors[0];
227+
expect(error.message).toEqual('Out of data');
228+
expectRange(error.range, 16, 0, 17, 0);
229+
});
230+
151231
it('error: task with extra data', () => {
152232
parseResult = Parser.parse('aaaa=bbb ccc', 'task');
153233
error = parseResult.lines[0].errors[0];
@@ -276,10 +356,20 @@ describe('parser:', () => {
276356
parseResult = Parser.parse('bbb | bbb', 'stream');
277357
expect(parseResult.lines.length).toEqual(1);
278358
error = parseResult.lines[0].errors[0];
279-
expect(error.message).toEqual('App \'bbb\' should be unique within the stream, use a label to differentiate multiple occurrences');
359+
expect(error.message)
360+
.toEqual('App \'bbb\' should be unique within the definition, use a label to differentiate multiple occurrences');
280361
expectRange(error.range, 6, 0, 9, 0);
281362
});
282363

364+
it('error: label required unmanaged stream app', () => {
365+
parseResult = Parser.parse('bbb, bbb', 'stream');
366+
expect(parseResult.lines.length).toEqual(1);
367+
error = parseResult.lines[0].errors[0];
368+
expect(error.message)
369+
.toEqual('App \'bbb\' should be unique within the definition, use a label to differentiate multiple occurrences');
370+
expectRange(error.range, 5, 0, 8, 0);
371+
});
372+
283373
it('error: rogue option name', () => {
284374
parseResult = Parser.parse('aaa --|=99 ', 'stream');
285375
error = parseResult.lines[0].errors[0];
@@ -391,6 +481,14 @@ describe('parser:', () => {
391481
expect(parseResult.lines[0].nodes[0].name).toEqual('time');
392482
});
393483

484+
it('check label unmanaged stream apps', () => {
485+
parseResult = Parser.parse('aaa, bbb: aaa', 'stream');
486+
expect((<Parser.StreamApp>parseResult.lines[0].nodes[0]).label).toBeUndefined();
487+
expect((<Parser.StreamApp>parseResult.lines[0].nodes[1]).label).toEqual('bbb');
488+
expect(parseResult.lines[0].nodes[0].name).toEqual('aaa');
489+
expect(parseResult.lines[0].nodes[1].name).toEqual('aaa');
490+
});
491+
394492
it('error: bad options 1', () => {
395493
parseResult = Parser.parse('aaa --', 'stream');
396494
expect(parseResult.lines.length).toEqual(1);

ui/src/app/shared/services/parser.ts

Lines changed: 67 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2017 the original author or authors.
2+
* Copyright 2016-2018 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,15 +41,12 @@ class InternalParser {
4141
this.textlines = definitionsText.split('\n');
4242
}
4343

44-
private tokenListToStringList(tokens, delimiter) {
44+
private tokenListToStringList(tokens) {
4545
if (tokens.length === 0) {
4646
return '';
4747
}
4848
let result = '';
4949
for (let t = 0; t < tokens.length; t++) {
50-
if (t > 0) {
51-
result = result + delimiter;
52-
}
5350
result = result + tokens[t].data;
5451
}
5552
return result;
@@ -125,6 +122,25 @@ class InternalParser {
125122
}
126123
}
127124

125+
private peekDestinationComponentToken(mustMatchAndConsumeIfDoes: boolean): Token {
126+
const t = this.peekAtToken();
127+
if (t === null) {
128+
throw {'msg': 'Out of data', 'start': this.text.length};
129+
}
130+
if (!(this.isKind(t, TokenKind.IDENTIFIER) || this.isKind(t, TokenKind.STAR) ||
131+
this.isKind(t, TokenKind.SLASH) || this.isKind(t, TokenKind.HASH))) {
132+
if (mustMatchAndConsumeIfDoes) {
133+
throw {'msg': 'Tokens of kind ' + t.kind + ' cannot be used in a destination', 'start': t.start};
134+
} else {
135+
return null;
136+
}
137+
}
138+
if (mustMatchAndConsumeIfDoes) {
139+
this.nextToken();
140+
}
141+
return t;
142+
}
143+
128144
private eatToken(expectedKind: TokenKind): Token {
129145
const t = this.nextToken();
130146
if (t === null) {
@@ -137,7 +153,7 @@ class InternalParser {
137153
return t;
138154
}
139155

140-
// A destination reference is of the form ':'IDENTIFIER['.'IDENTIFIER]*
156+
// A destination reference is of the form ':'(IDENTIFIER|STAR|SLASH|HASH)['.'(IDENTIFIER|STAR|SLASH|HASH]*
141157
private eatDestinationReference(tapAllowed: boolean): Parser.DestinationReference {
142158
const nameComponents = [];
143159
let t;
@@ -147,16 +163,21 @@ class InternalParser {
147163
throw {'msg': 'Destination must start with a \':\'', 'start': firstToken.start, 'end': firstToken.end};
148164
}
149165
if (!this.isNextTokenAdjacent()) {
150-
t = this.peekAtToken();
166+
t = this.peekAtToken();
151167
if (t) {
152168
throw {'msg': 'No whitespace allowed in destination', 'start': firstToken.end, 'end': t.start};
153169
} else {
154170
throw {'msg': 'Out of data - incomplete destination', 'start': firstToken.start};
155171
}
156172
}
157-
nameComponents.push(this.eatToken(TokenKind.IDENTIFIER)); // the non-optional identifier
173+
let isDotted = false;
174+
nameComponents.push(this.peekDestinationComponentToken(true));
175+
while (this.isNextTokenAdjacent() && (this.peekDestinationComponentToken(false) !== null)) {
176+
nameComponents.push(this.peekDestinationComponentToken(true));
177+
}
158178
while (this.isNextTokenAdjacent() && this.peekToken(TokenKind.DOT)) {
159179
currentToken = this.eatToken(TokenKind.DOT);
180+
isDotted = true;
160181
if (!this.isNextTokenAdjacent()) {
161182
t = this.peekAtToken();
162183
if (t) {
@@ -165,11 +186,15 @@ class InternalParser {
165186
throw {'msg': 'Out of data - incomplete destination', 'start': currentToken.start};
166187
}
167188
}
168-
nameComponents.push(this.eatToken(TokenKind.IDENTIFIER));
189+
nameComponents.push(currentToken);
190+
nameComponents.push(this.peekDestinationComponentToken(true));
191+
while (this.isNextTokenAdjacent() && (this.peekDestinationComponentToken(false) !== null)) {
192+
nameComponents.push(this.peekDestinationComponentToken(true));
193+
}
169194
}
170195
let type = null;
171196
// TODO this does not cope with dotted stream names...
172-
if (nameComponents.length < 2 || !tapAllowed) {
197+
if (!isDotted || !tapAllowed) {
173198
type = 'destination';
174199
} else {
175200
type = 'tap';
@@ -179,7 +204,7 @@ class InternalParser {
179204
type: type,
180205
start: firstToken.start,
181206
end: endpos,
182-
name: (type === 'tap' ? 'tap:' : '') + this.tokenListToStringList(nameComponents, '.')
207+
name: (type === 'tap' ? 'tap:' : '') + this.tokenListToStringList(nameComponents)
183208
};
184209
return destinationObject;
185210
}
@@ -338,19 +363,42 @@ class InternalParser {
338363
// appList: app (| app)*
339364
// A stream may end in a app (if it is a sink) or be followed by
340365
// a sink channel.
341-
private eatAppList(): Parser.AppNode[] {
366+
private eatAppList(preceedingSourceChannelSpecified: boolean): Parser.AppNode[] {
342367
const appNodes: Parser.AppNode[] = [];
368+
let usedListDelimiter = -1;
369+
let usedStreamDelimiter = -1;
343370
appNodes.push(this.eatApp());
344371
while (this.moreTokens()) {
345372
const t = this.peekAtToken();
346373
if (this.isKind(t, TokenKind.PIPE)) {
374+
if (usedListDelimiter >= 0) {
375+
throw {'msg': 'Don\'t mix pipe and comma', 'start': usedListDelimiter};
376+
}
377+
usedStreamDelimiter = t.start;
378+
this.nextToken();
379+
appNodes.push(this.eatApp());
380+
} else if (this.isKind(t, TokenKind.COMMA)) {
381+
if (preceedingSourceChannelSpecified) {
382+
throw {'msg': 'Don\'t use comma with channels', 'start': t.start};
383+
}
384+
if (usedStreamDelimiter >= 0) {
385+
throw {'msg': 'Don\'t mix pipe and comma', 'start': usedStreamDelimiter};
386+
}
387+
usedListDelimiter = t.start;
347388
this.nextToken();
348389
appNodes.push(this.eatApp());
349390
} else {
350391
// might be followed by sink channel
351392
break;
352393
}
353394
}
395+
const isFollowedBySinkChannel = this.peekToken(TokenKind.GT);
396+
if (isFollowedBySinkChannel && usedListDelimiter >= 0) {
397+
throw {'msg': 'Don\'t use comma with channels', 'start': usedListDelimiter};
398+
}
399+
for (let appNumber = 0; appNumber < appNodes.length; appNumber++) {
400+
appNodes[appNumber].nonStreamApp = !preceedingSourceChannelSpecified && !isFollowedBySinkChannel && (usedStreamDelimiter < 0);
401+
}
354402
return appNodes;
355403
}
356404

@@ -449,9 +497,9 @@ class InternalParser {
449497
if (bridge) {
450498
// Create a bridge app to hang the source/sink channels off
451499
this.tokenStreamPointer--; // Rewind so we can nicely eat the sink channel
452-
appNodes = [{'name': 'bridge', 'start': this.peekAtToken().start, 'end': this.peekAtToken().end}];
500+
appNodes = [{'name': 'bridge', 'start': this.peekAtToken().start, 'end': this.peekAtToken().end, 'nonStreamApp': false}];
453501
} else {
454-
appNodes = this.eatAppList();
502+
appNodes = this.eatAppList(sourceChannelNode != null);
455503
}
456504
streamNode.apps = appNodes;
457505
const sinkChannelNode = this.maybeEatSinkChannel();
@@ -552,6 +600,9 @@ class InternalParser {
552600
sinkChannelName = streamdef.sinkChannel.channel.name;
553601
}
554602
app = streamdef.apps[m];
603+
if (app.nonStreamApp) {
604+
expectedType = 'app';
605+
}
555606
options = new Map();
556607
optionsranges = new Map();
557608
if (app.options) {
@@ -587,7 +638,7 @@ class InternalParser {
587638
'\' (at app position ' + m + ') and app \'' + streamdef.apps[previous].name +
588639
'\' (at app position ' + previous + ') both use it'
589640
: 'App \'' + app.name +
590-
'\' should be unique within the stream, use a label to differentiate multiple occurrences',
641+
'\' should be unique within the definition, use a label to differentiate multiple occurrences',
591642
'range': streamObject.range
592643
});
593644
} else {
@@ -696,6 +747,7 @@ export namespace Parser {
696747
options?: Parser.Option[];
697748
start: number;
698749
end: number;
750+
nonStreamApp?: boolean;
699751
}
700752

701753
export interface Option {

0 commit comments

Comments
 (0)