Skip to content

Commit 3e45179

Browse files
JHoelliJacqueline HölligJacqueline Hölligdominikriemer
authored
feat: Add retention time configuration (apache#3763)
* init * prettier and lint * first draft UI * first draft ui * first UI drafr * Basic Version of Data Model in Java * Eliminated Stepper * added action to ui * added action to data model * changes to swagger auth * drafted api endpoint * save instance wise scheduler * first cron job draft * Minor * first running scheduling draft * corrected settings cron job * eliminated cron interval in frontend * added elementID in configuration entry * eliminated some enum issued * first working draft * retention working * deleted comments * added retention coloring * cleanup * Revert "init" This reverts commit a76dd1d. * eliminated double lisences * Update streampipes-model/src/main/java/org/apache/streampipes/model/datalake/RetentionTimeConfig.java eliminaated unnecessary line Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * Update streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java changed naming of logger Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * Update streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java deleted spacing Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * Update streampipes-service-core/src/main/java/org/apache/streampipes/service/core/scheduler/DataLakeScheduler.java deleted empty line Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * Update ui/src/app/configuration/dialog/data-retention-dialog/model/retention-config.model.ts Deleted Export Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * Update ui/src/app/configuration/dialog/data-retention-dialog/components/select-retention/select-retention-action/select-retention-action.component.ts deleted comment Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * Update ui/src/app/configuration/dialog/data-retention-dialog/model/data-retention-config.model.ts deleted comment Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com> * new line in record DataRetentionConfig * record new line added correct email * eliminated TODO comment * deleted empty if * fixed type error * changed log to LOF * added Header in DataLakeScheduler * maven issues * fixed maven formatting --------- Co-authored-by: Jacqueline Höllig <jacquelinehollig@Jacquelines-MacBook-Pro.local> Co-authored-by: Jacqueline Höllig <jacquelinehollig@Mac.fritz.box> Co-authored-by: Dominik Riemer <dominik.riemer@gmail.com>
1 parent 39756a7 commit 3e45179

File tree

38 files changed

+1423
-5
lines changed

38 files changed

+1423
-5
lines changed

streampipes-model/src/main/java/org/apache/streampipes/model/datalake/DataLakeMeasure.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,8 @@ public class DataLakeMeasure implements Storable {
5353

5454
private DataLakeMeasureSchemaUpdateStrategy schemaUpdateStrategy = DataLakeMeasureSchemaUpdateStrategy.UPDATE_SCHEMA;
5555

56+
private RetentionTimeConfig retentionTime;
57+
5658
public DataLakeMeasure() {
5759
super();
5860
}
@@ -139,6 +141,15 @@ public void setSchemaUpdateStrategy(DataLakeMeasureSchemaUpdateStrategy schemaUp
139141
this.schemaUpdateStrategy = schemaUpdateStrategy;
140142
}
141143

144+
145+
public void setRetentionTime(RetentionTimeConfig retentionTime) {
146+
this.retentionTime = retentionTime;
147+
}
148+
149+
public RetentionTimeConfig getRetentionTime() {
150+
return retentionTime;
151+
}
152+
142153
/**
143154
* This can be used to get the name of the timestamp property without the stream prefix
144155
*
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.model.datalake;
20+
21+
public record DataRetentionConfig(
22+
RetentionInterval interval,
23+
int olderThanDays,
24+
RetentionAction action) {}
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
20+
package org.apache.streampipes.model.datalake;
21+
22+
public enum RetentionAction {
23+
DELETE,
24+
SAVE,
25+
SAVEDELETE
26+
}
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.model.datalake;
20+
21+
public record RetentionExportConfig() {}
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.model.datalake;
20+
21+
public enum RetentionInterval {
22+
DAILY,
23+
MONTHLY,
24+
WEEKLY
25+
}
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
19+
package org.apache.streampipes.model.datalake;
20+
//, RetentionExportConfig exportConfig
21+
public record RetentionTimeConfig(
22+
DataRetentionConfig dataRetentionConfig) {}

streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeMeasureResource.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,7 @@ public ResponseEntity<?> updateDataLakeMeasure(
115115
return badRequest();
116116
}
117117

118+
118119
@DeleteMapping(path = "{id}", produces = MediaType.APPLICATION_JSON_VALUE)
119120
public ResponseEntity<?> deleteDataLakeMeasure(@PathVariable("id") String elementId) {
120121
try {

streampipes-rest/src/main/java/org/apache/streampipes/rest/impl/datalake/DataLakeResource.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
2626
import org.apache.streampipes.model.datalake.DataLakeMeasure;
2727
import org.apache.streampipes.model.datalake.DataSeries;
28+
import org.apache.streampipes.model.datalake.RetentionTimeConfig;
2829
import org.apache.streampipes.model.datalake.SpQueryResult;
2930
import org.apache.streampipes.model.datalake.param.ProvidedRestQueryParams;
3031
import org.apache.streampipes.model.message.Notifications;
@@ -400,6 +401,32 @@ private boolean checkProvidedQueryParams(Map<String, String> providedParams) {
400401
return SUPPORTED_PARAMS.containsAll(providedParams.keySet());
401402
}
402403

404+
@PostMapping(
405+
path = "/{elementId}/cleanup",
406+
produces = MediaType.APPLICATION_JSON_VALUE,
407+
consumes = MediaType.APPLICATION_JSON_VALUE)
408+
@Operation(summary = "Sets the retention mechanism for a certain measurement", tags = {"Data Lake"},
409+
responses = {
410+
@ApiResponse(
411+
responseCode = "400",
412+
description = "Can't store the given data to this data lake"),
413+
@ApiResponse(
414+
responseCode = "200",
415+
description = "Successfully stored data")})
416+
public ResponseEntity<?> setDataLakeRetention(
417+
@PathVariable String elementId,
418+
@RequestBody RetentionTimeConfig retention){
419+
var measure = this.dataExplorerSchemaManagement.getById(elementId);
420+
measure.setRetentionTime(retention);
421+
try {
422+
this.dataExplorerSchemaManagement.updateMeasurement(measure);
423+
} catch (IllegalArgumentException e) {
424+
return badRequest(e.getMessage());
425+
}
426+
427+
return ok();
428+
}
429+
403430
private ProvidedRestQueryParams populate(String measurementId, Map<String, String> rawParams) {
404431
Map<String, String> queryParamMap = new HashMap<>();
405432
rawParams.forEach((key, value) -> queryParamMap.put(key, String.join(",", value)));

streampipes-service-core/src/main/java/org/apache/streampipes/service/core/StreamPipesCoreApplication.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@
5656
import org.springframework.context.annotation.ComponentScan;
5757
import org.springframework.context.annotation.Configuration;
5858
import org.springframework.context.annotation.Import;
59+
import org.springframework.scheduling.annotation.EnableScheduling;
5960

6061
import jakarta.annotation.PostConstruct;
6162
import jakarta.annotation.PreDestroy;
@@ -67,6 +68,7 @@
6768

6869
@Configuration
6970
@EnableAutoConfiguration
71+
@EnableScheduling
7072
@Import({
7173
OpenApiConfiguration.class,
7274
SpPermissionEvaluator.class,
@@ -77,7 +79,8 @@
7779
})
7880
@ComponentScan({
7981
"org.apache.streampipes.rest.*",
80-
"org.apache.streampipes.service.core.oauth2"
82+
"org.apache.streampipes.service.core.oauth2",
83+
"org.apache.streampipes.service.core.scheduler"
8184
})
8285
public class StreamPipesCoreApplication extends StreamPipesServiceBase {
8386

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*
17+
*/
18+
package org.apache.streampipes.service.core.scheduler;
19+
20+
import org.apache.streampipes.dataexplorer.api.IDataExplorerQueryManagement;
21+
import org.apache.streampipes.dataexplorer.api.IDataExplorerSchemaManagement;
22+
import org.apache.streampipes.dataexplorer.management.DataExplorerDispatcher;
23+
import org.apache.streampipes.model.datalake.DataLakeMeasure;
24+
25+
import org.slf4j.Logger;
26+
import org.slf4j.LoggerFactory;
27+
import org.springframework.scheduling.annotation.Scheduled;
28+
import org.springframework.stereotype.Component;
29+
30+
import java.time.Instant;
31+
import java.time.temporal.ChronoUnit;
32+
import java.util.List;
33+
34+
35+
@Component
36+
public class DataLakeScheduler {
37+
38+
private static final Logger LOG = LoggerFactory.getLogger(DataLakeScheduler.class);
39+
40+
private final IDataExplorerSchemaManagement dataExplorerSchemaManagement = new DataExplorerDispatcher()
41+
.getDataExplorerManager()
42+
.getSchemaManagement();
43+
44+
private final IDataExplorerQueryManagement dataExplorerQueryManagement = new DataExplorerDispatcher()
45+
.getDataExplorerManager()
46+
.getQueryManagement(this.dataExplorerSchemaManagement);
47+
48+
public void exportMeasurements() {
49+
// Method body is empty; add functionality as needed
50+
}
51+
52+
public void deleteMeasurements(DataLakeMeasure m) {
53+
Instant now = Instant.now();
54+
Instant daysAgo = now.minus(m.getRetentionTime().dataRetentionConfig().olderThanDays(), ChronoUnit.DAYS);
55+
56+
long endDate = daysAgo.toEpochMilli();
57+
LOG.info("Current time in millis: " + now.toEpochMilli());
58+
LOG.info("Current time in millis to delete: " + endDate);
59+
60+
this.dataExplorerQueryManagement.deleteData(m.getMeasureName(), null, endDate);
61+
}
62+
63+
@Scheduled(cron = "0 1 0 * * 6") // CronJob Scheduled every Saturday (5) 00:01
64+
public void cleanupMeasurements() {
65+
List<DataLakeMeasure> allMeasurements = this.dataExplorerSchemaManagement.getAllMeasurements();
66+
LOG.info("GET ALL Measurements");
67+
for (DataLakeMeasure m : allMeasurements) {
68+
if (m.getRetentionTime() != null) {
69+
LOG.info("Start delete Measurement " + m.getMeasureName());
70+
deleteMeasurements(m);
71+
LOG.info("Measurements " + m.getMeasureName() + " successfully deleted");
72+
}
73+
}
74+
}
75+
}

0 commit comments

Comments
 (0)