Skip to content

Commit de17d06

Browse files
authored
fix(apache#3833): Add handling for duplicate keys (apache#3834)
1 parent 8e51d30 commit de17d06

File tree

1 file changed

+43
-28
lines changed
  • streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview

1 file changed

+43
-28
lines changed

streampipes-pipeline-management/src/main/java/org/apache/streampipes/manager/preview/PipelinePreview.java

Lines changed: 43 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -73,33 +73,50 @@ public void deletePreview(String previewId) {
7373
}
7474

7575
public Map<String, SpDataStream> getPipelineElementPreviewStreams(String previewId) throws IllegalArgumentException {
76+
7677
return ActivePipelinePreviews
7778
.INSTANCE
7879
.getInvocationGraphs(previewId)
7980
.stream()
80-
.filter(graph -> graph instanceof DataProcessorInvocation || graph instanceof SpDataStream)
81+
.filter(this::isProcessorOrStream)
8182
.collect(Collectors.toMap(
8283
NamedStreamPipesEntity::getElementId,
83-
graph -> {
84-
if (graph instanceof DataProcessorInvocation) {
85-
return ((DataProcessorInvocation) graph).getOutputStream();
86-
} else {
87-
return (SpDataStream) graph;
88-
}
89-
}
84+
this::extractStreamFromElement,
85+
(existing, replacement) -> existing // keep the first stream in case of duplicate keys
9086
));
9187
}
9288

93-
private void rewriteElementIds(List<NamedStreamPipesEntity> pipelineElements,
94-
Map<String, String> elementIdMappings) {
89+
private boolean isProcessorOrStream(NamedStreamPipesEntity pe) {
90+
return pe instanceof DataProcessorInvocation || pe instanceof SpDataStream;
91+
}
92+
93+
private SpDataStream extractStreamFromElement(NamedStreamPipesEntity element) {
94+
if (element instanceof DataProcessorInvocation) {
95+
return ((DataProcessorInvocation) element).getOutputStream();
96+
} else if (element instanceof SpDataStream) {
97+
return (SpDataStream) element;
98+
} else {
99+
throw new IllegalArgumentException("Unsupported graph type: " + element.getClass()
100+
.getSimpleName());
101+
}
102+
103+
}
104+
105+
private void rewriteElementIds(
106+
List<NamedStreamPipesEntity> pipelineElements,
107+
Map<String, String> elementIdMappings
108+
) {
95109
pipelineElements
96110
.forEach(pe -> {
97111
if (pe instanceof DataProcessorInvocation) {
98112
var originalElementId = pe.getElementId();
99-
var newElementId = (String.format(
100-
"%s:%s",
101-
StringUtils.substringBeforeLast(pe.getElementId(), ":"),
102-
RandomStringUtils.randomAlphanumeric(5)));
113+
var newElementId = (
114+
String.format(
115+
"%s:%s",
116+
StringUtils.substringBeforeLast(pe.getElementId(), ":"),
117+
RandomStringUtils.randomAlphanumeric(5)
118+
)
119+
);
103120
pe.setElementId(newElementId);
104121
elementIdMappings.put(originalElementId, newElementId);
105122
} else {
@@ -138,36 +155,34 @@ private void deleteGraphs(String previewId) {
138155
ActivePipelinePreviews.INSTANCE.removePreview(previewId);
139156
}
140157

141-
private void storeGraphs(String previewId,
142-
List<NamedStreamPipesEntity> graphs) {
158+
private void storeGraphs(
159+
String previewId,
160+
List<NamedStreamPipesEntity> graphs
161+
) {
143162
ActivePipelinePreviews.INSTANCE.addActivePreview(previewId, graphs);
144163
}
145164

146165
private String generatePreviewId() {
147-
return UUID.randomUUID().toString();
166+
return UUID.randomUUID()
167+
.toString();
148168
}
149169

150-
private PipelinePreviewModel makePreviewModel(String previewId,
151-
Map<String, String> elementIdMappings) {
170+
private PipelinePreviewModel makePreviewModel(
171+
String previewId,
172+
Map<String, String> elementIdMappings
173+
) {
152174
PipelinePreviewModel previewModel = new PipelinePreviewModel();
153175
previewModel.setPreviewId(previewId);
154176
previewModel.setElementIdMappings(elementIdMappings);
155177

156178
return previewModel;
157179
}
158180

159-
private List<String> collectElementIds(List<NamedStreamPipesEntity> graphs) {
160-
return graphs
161-
.stream()
162-
.map(NamedStreamPipesEntity::getElementId)
163-
.collect(Collectors.toList());
164-
}
165-
166181
private List<InvocableStreamPipesEntity> filter(List<NamedStreamPipesEntity> graphs) {
167182
List<InvocableStreamPipesEntity> dataProcessors = new ArrayList<>();
168183
graphs.stream()
169-
.filter(g -> g instanceof DataProcessorInvocation)
170-
.forEach(p -> dataProcessors.add((DataProcessorInvocation) p));
184+
.filter(g -> g instanceof DataProcessorInvocation)
185+
.forEach(p -> dataProcessors.add((DataProcessorInvocation) p));
171186

172187
return dataProcessors;
173188
}

0 commit comments

Comments
 (0)