Skip to content

Commit 0d24ef4

Browse files
authored
Add related assets to adapter during adapter generation process (apache#3804)
1 parent f757456 commit 0d24ef4

File tree

19 files changed

+811
-84
lines changed

19 files changed

+811
-84
lines changed

streampipes-data-explorer-api/src/main/java/org/apache/streampipes/dataexplorer/api/IDataExplorerSchemaManagement.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,13 +21,16 @@
2121
import org.apache.streampipes.model.datalake.DataLakeMeasure;
2222

2323
import java.util.List;
24+
import java.util.Optional;
2425

2526
public interface IDataExplorerSchemaManagement {
2627

2728
List<DataLakeMeasure> getAllMeasurements();
2829

2930
DataLakeMeasure getById(String elementId);
3031

32+
Optional<DataLakeMeasure> getExistingMeasureByName(String measureName);
33+
3134
DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure);
3235

3336
void deleteMeasurement(String elementId);

streampipes-data-explorer/src/main/java/org/apache/streampipes/dataexplorer/DataExplorerSchemaManagement.java

Lines changed: 27 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,8 @@ public DataLakeMeasure getById(String elementId) {
5151
}
5252

5353
/**
54-
* For new measurements an entry is generated in the database. For existing measurements the schema is updated
54+
* For new measurements an entry is generated in the database. For existing
55+
* measurements the schema is updated
5556
* according to the update strategy defined by the measurement.
5657
*/
5758
@Override
@@ -75,28 +76,28 @@ public DataLakeMeasure createOrUpdateMeasurement(DataLakeMeasure measure) {
7576
*/
7677
private void handleExistingMeasurement(
7778
DataLakeMeasure measure,
78-
DataLakeMeasure existingMeasure
79-
) {
79+
DataLakeMeasure existingMeasure) {
8080
measure.setElementId(existingMeasure.getElementId());
8181
if (DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA.equals(measure.getSchemaUpdateStrategy())) {
8282
// For the update schema strategy the old schema is overwritten with the new one
8383
updateMeasurement(measure);
8484
} else {
85-
// For the extent existing schema strategy the old schema is merged with the new one
85+
// For the extent existing schema strategy the old schema is merged with the new
86+
// one
8687
unifyEventSchemaAndUpdateMeasure(measure, existingMeasure);
8788
}
8889
}
8990

90-
9191
/**
9292
* Returns the existing measure that has the provided measure name
9393
*/
94-
private Optional<DataLakeMeasure> getExistingMeasureByName(String measureName) {
94+
@Override
95+
public Optional<DataLakeMeasure> getExistingMeasureByName(String measureName) {
9596
return dataLakeStorage.findAll()
96-
.stream()
97-
.filter(m -> m.getMeasureName()
98-
.equals(measureName))
99-
.findFirst();
97+
.stream()
98+
.filter(m -> m.getMeasureName()
99+
.equals(measureName))
100+
.findFirst();
100101
}
101102

102103
private static void setDefaultUpdateStrategyIfNoneProvided(DataLakeMeasure measure) {
@@ -117,16 +118,15 @@ public void deleteMeasurement(String elementId) {
117118
@Override
118119
public boolean deleteMeasurementByName(String measureName) {
119120
var measureToDeleteOpt = dataLakeStorage.findAll()
120-
.stream()
121-
.filter(measurement -> measurement.getMeasureName()
122-
.equals(measureName))
123-
.findFirst();
121+
.stream()
122+
.filter(measurement -> measurement.getMeasureName()
123+
.equals(measureName))
124+
.findFirst();
124125

125126
return measureToDeleteOpt.map(measure -> {
126127
dataLakeStorage.deleteElementById(measure.getElementId());
127128
return true;
128-
}
129-
).orElse(false);
129+
}).orElse(false);
130130
}
131131

132132
@Override
@@ -146,16 +146,15 @@ private void setSchemaVersionAndStoreMeasurement(DataLakeMeasure measure) {
146146
}
147147

148148
/**
149-
* First the event schemas of the measurements are merged and then the measure is updated in the database
149+
* First the event schemas of the measurements are merged and then the measure
150+
* is updated in the database
150151
*/
151152
private void unifyEventSchemaAndUpdateMeasure(
152153
DataLakeMeasure measure,
153-
DataLakeMeasure existingMeasure
154-
) {
154+
DataLakeMeasure existingMeasure) {
155155
var properties = getUnifiedEventProperties(
156156
existingMeasure,
157-
measure
158-
);
157+
measure);
159158

160159
measure
161160
.getEventSchema()
@@ -170,26 +169,23 @@ private void unifyEventSchemaAndUpdateMeasure(
170169
*/
171170
private List<EventProperty> getUnifiedEventProperties(
172171
DataLakeMeasure measure1,
173-
DataLakeMeasure measure2
174-
) {
175-
// Combine the event properties from both measures into a single Stream
172+
DataLakeMeasure measure2) {
173+
// Combine the event properties from both measures into a single Stream
176174
var allMeasurementProperties = Stream.concat(
177175
measure1.getEventSchema()
178-
.getEventProperties()
179-
.stream(),
176+
.getEventProperties()
177+
.stream(),
180178
measure2.getEventSchema()
181-
.getEventProperties()
182-
.stream()
183-
);
179+
.getEventProperties()
180+
.stream());
184181

185182
// Filter event properties by removing duplicate runtime names
186183
// If there are duplicate keys, choose the first occurrence
187184
var unifiedEventProperties = allMeasurementProperties
188185
.collect(Collectors.toMap(
189186
EventProperty::getRuntimeName,
190187
Function.identity(),
191-
(eventProperty, eventProperty2) -> eventProperty
192-
))
188+
(eventProperty, eventProperty2) -> eventProperty))
193189
.values();
194190
return new ArrayList<>(unifiedEventProperties);
195191
}

streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResource.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -49,42 +49,33 @@ public class DataLakeMeasureResource extends AbstractAuthGuardedRestResource {
4949

5050
public DataLakeMeasureResource() {
5151
this.dataLakeMeasureManagement = new DataExplorerDispatcher().getDataExplorerManager()
52-
.getSchemaManagement();
52+
.getSchemaManagement();
5353
}
5454

55-
@PostMapping(
56-
produces = MediaType.APPLICATION_JSON_VALUE,
57-
consumes = MediaType.APPLICATION_JSON_VALUE
58-
)
55+
@PostMapping(produces = MediaType.APPLICATION_JSON_VALUE, consumes = MediaType.APPLICATION_JSON_VALUE)
5956
public ResponseEntity<DataLakeMeasure> addDataLake(@RequestBody DataLakeMeasure dataLakeMeasure) {
6057
DataLakeMeasure result = this.dataLakeMeasureManagement.createOrUpdateMeasurement(dataLakeMeasure);
6158
return ok(result);
6259
}
6360

6461
/**
65-
* Handles HTTP GET requests to retrieve the entry counts of specified measurements.
62+
* Handles HTTP GET requests to retrieve the entry counts of specified
63+
* measurements.
6664
*
6765
* @param measurementNames A list of measurement names to return the count.
68-
* @return A ResponseEntity containing a map of measurement names and their corresponding entry counts.
66+
* @return A ResponseEntity containing a map of measurement names and their
67+
* corresponding entry counts.
6968
*/
70-
@Operation(
71-
summary = "Retrieve measurement counts",
72-
description = "Retrieves the entry counts for the specified measurements from the data lake.")
73-
@GetMapping(
74-
path = "/count",
75-
produces = MediaType.APPLICATION_JSON_VALUE)
69+
@Operation(summary = "Retrieve measurement counts", description = "Retrieves the entry counts for the specified measurements from the data lake.")
70+
@GetMapping(path = "/count", produces = MediaType.APPLICATION_JSON_VALUE)
7671
public ResponseEntity<Map<String, Integer>> getEntryCountsOfMeasurments(
77-
@Parameter(description = "A list of measurement names to return the count.")
78-
@RequestParam(value = "measurementNames")
79-
List<String> measurementNames
80-
) {
72+
@Parameter(description = "A list of measurement names to return the count.") @RequestParam(value = "measurementNames") List<String> measurementNames) {
8173
var allMeasurements = this.dataLakeMeasureManagement.getAllMeasurements();
8274
var result = new DataExplorerDispatcher()
8375
.getDataExplorerManager()
8476
.getMeasurementCounter(
8577
allMeasurements,
86-
measurementNames
87-
)
78+
measurementNames)
8879
.countMeasurementSizes();
8980
return ok(result);
9081
}
@@ -99,11 +90,20 @@ public ResponseEntity<?> getDataLakeMeasure(@PathVariable("id") String elementId
9990
}
10091
}
10192

93+
@GetMapping(path = "byName/{measureName}", produces = MediaType.APPLICATION_JSON_VALUE)
94+
public ResponseEntity<?> getDataLakeMeasureName(@PathVariable("measureName") String measureName) {
95+
var measure = this.dataLakeMeasureManagement.getExistingMeasureByName(measureName);
96+
if (Objects.nonNull(measure)) {
97+
return ok(measure);
98+
} else {
99+
return notFound();
100+
}
101+
}
102+
102103
@PutMapping(path = "{id}", consumes = MediaType.APPLICATION_JSON_VALUE)
103104
public ResponseEntity<?> updateDataLakeMeasure(
104105
@PathVariable("id") String elementId,
105-
@RequestBody DataLakeMeasure measure
106-
) {
106+
@RequestBody DataLakeMeasure measure) {
107107
if (elementId.equals(measure.getElementId())) {
108108
try {
109109
this.dataLakeMeasureManagement.updateMeasurement(measure);
@@ -115,7 +115,6 @@ public ResponseEntity<?> updateDataLakeMeasure(
115115
return badRequest();
116116
}
117117

118-
119118
@DeleteMapping(path = "{id}", produces = MediaType.APPLICATION_JSON_VALUE)
120119
public ResponseEntity<?> deleteDataLakeMeasure(@PathVariable("id") String elementId) {
121120
try {

ui/deployment/i18n/en.json

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -482,5 +482,10 @@
482482
"Error Details": null,
483483
"Resources": null,
484484
"All {{allResourcesAlias}}": "All {{allResourcesAlias}}",
485-
"{{ widgetTitle }} Clone": "{{ widgetTitle }} Clone"
485+
"{{ widgetTitle }} Clone": "{{ widgetTitle }} Clone",
486+
"Your {{assetTypes}} were successfully added to {{assetIds}}.": "Your {{assetTypes}} were successfully added to {{assetIds}}.",
487+
"Your {{assetTypes}} were successfully deleted from {{assetIds}}.": "Your {{assetTypes}} were successfully deleted from {{assetIds}}.",
488+
"Starting adapter {{adapterName}}": "Starting adapter {{adapterName}}",
489+
"Creating adapter {{adapterName}}": "Creating adapter {{adapterName}}",
490+
"Updating adapter {{adapterName}}": "Updating adapter {{adapterName}}"
486491
}

ui/projects/streampipes/platform-services/src/lib/apis/datalake-rest.service.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,12 @@ export class DatalakeRestService {
7575
.pipe(map(res => res as DataLakeMeasure));
7676
}
7777

78+
getMeasurementByName(name: String): Observable<DataLakeMeasure> {
79+
return this.http
80+
.get(`${this.dataLakeMeasureUrl}/byName/${name}`)
81+
.pipe(map(res => res as DataLakeMeasure));
82+
}
83+
7884
performMultiQuery(
7985
queryParams: DatalakeQueryParameters[],
8086
): Observable<SpQueryResult[]> {

ui/projects/streampipes/platform-services/src/lib/model/assets/asset.model.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ export interface AssetLink {
4343
navigationActive: boolean;
4444
}
4545

46+
export interface LinkageData {
47+
//Data Model to extract AssetLinks from the UI
48+
name: string;
49+
id: string;
50+
type: string;
51+
selected?: boolean | null;
52+
}
53+
4654
export interface Isa95TypeDesc {
4755
label: string;
4856
type: Isa95Type;
@@ -94,6 +102,14 @@ export interface SpAssetModel extends SpAsset {
94102
removable: boolean;
95103
}
96104

105+
export interface SpAssetTreeNode {
106+
assetId: string;
107+
assetName: string;
108+
assets?: SpAssetTreeNode[];
109+
spAssetModelId: string;
110+
flattenPath: any[];
111+
}
112+
97113
export type Isa95Type =
98114
| 'PROCESS_CELL'
99115
| 'PRODUCTION_UNIT'
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
<!--
2+
~ Licensed to the Apache Software Foundation (ASF) under one or more
3+
~ contributor license agreements. See the NOTICE file distributed with
4+
~ this work for additional information regarding copyright ownership.
5+
~ The ASF licenses this file to You under the Apache License, Version 2.0
6+
~ (the "License"); you may not use this file except in compliance with
7+
~ the License. You may obtain a copy of the License at
8+
~
9+
~ http://www.apache.org/licenses/LICENSE-2.0
10+
~
11+
~ Unless required by applicable law or agreed to in writing, software
12+
~ distributed under the License is distributed on an "AS IS" BASIS,
13+
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
~ See the License for the specific language governing permissions and
15+
~ limitations under the License.
16+
~
17+
-->
18+
<div *ngIf="assetsData?.length">
19+
<mat-tree
20+
[dataSource]="dataSource"
21+
[treeControl]="treeControl"
22+
class="asset-tree"
23+
>
24+
<!-- Parent Node Definition -->
25+
<mat-nested-tree-node *matTreeNodeDef="let node; when: hasChild">
26+
<div
27+
class="mat-tree-node"
28+
(click)="onAssetSelect(node)"
29+
[class.selected-node]="isSelected(node)"
30+
>
31+
<button
32+
mat-icon-button
33+
matTreeNodeToggle
34+
[attr.aria-label]="'Toggle ' + node.assetName"
35+
>
36+
<mat-icon>{{
37+
treeControl.isExpanded(node)
38+
? 'expand_more'
39+
: 'chevron_right'
40+
}}</mat-icon>
41+
</button>
42+
<span>{{ node.assetName }}</span>
43+
</div>
44+
<div *ngIf="treeControl.isExpanded(node)" role="group">
45+
<ng-container matTreeNodeOutlet></ng-container>
46+
</div>
47+
</mat-nested-tree-node>
48+
49+
<!-- Leaf Node Definition (no children) -->
50+
<mat-tree-node *matTreeNodeDef="let node" matTreeNodeToggle>
51+
<div
52+
class="mat-tree-node"
53+
(click)="onAssetSelect(node)"
54+
[class.selected-node]="isSelected(node)"
55+
>
56+
<button
57+
mat-icon-button
58+
matTreeNodeToggle
59+
[attr.aria-label]="'Toggle ' + node.assetName"
60+
>
61+
<mat-icon></mat-icon>
62+
</button>
63+
<span>{{ node.assetName }}</span>
64+
</div>
65+
</mat-tree-node>
66+
</mat-tree>
67+
</div>
68+
69+
<!-- If no assets available -->
70+
<div *ngIf="!assetsData?.length">
71+
<p>No assets available</p>
72+
</div>

0 commit comments

Comments
 (0)