Skip to content

Commit b74a5e1

Browse files
authored
refactor(#3622): Migrate processors in processors-transformation-jvm to use IStreamPipesDataProcessor (#3625)
* refactor(#3622): Update processors to use IStreamPipesDataProcessor * refactor(#3622): Update further processors to use IStreamPipesDataProcessor
1 parent 95ab2dc commit b74a5e1

File tree

28 files changed

+1088
-1028
lines changed

28 files changed

+1088
-1028
lines changed

streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/count/CountArrayProcessor.java

Lines changed: 26 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -18,71 +18,69 @@
1818

1919
package org.apache.streampipes.processors.transformation.jvm.processor.array.count;
2020

21-
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
21+
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
22+
import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
2223
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
24+
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
2325
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
2426
import org.apache.streampipes.model.DataProcessorType;
2527
import org.apache.streampipes.model.extensions.ExtensionAssetType;
26-
import org.apache.streampipes.model.graph.DataProcessorDescription;
2728
import org.apache.streampipes.model.runtime.Event;
2829
import org.apache.streampipes.model.runtime.field.AbstractField;
2930
import org.apache.streampipes.model.schema.PropertyScope;
3031
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
3132
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
33+
import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
3234
import org.apache.streampipes.sdk.helpers.EpProperties;
3335
import org.apache.streampipes.sdk.helpers.EpRequirements;
3436
import org.apache.streampipes.sdk.helpers.Labels;
3537
import org.apache.streampipes.sdk.helpers.Locales;
3638
import org.apache.streampipes.sdk.helpers.OutputStrategies;
3739
import org.apache.streampipes.vocabulary.SO;
38-
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
39-
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
4040

4141
import java.util.List;
4242

43-
public class CountArrayProcessor extends StreamPipesDataProcessor {
43+
public class CountArrayProcessor implements IStreamPipesDataProcessor {
4444

4545
public static final String COUNT_NAME = "countValue";
4646
public static final String ARRAY_FIELD = "array-field";
4747

4848
private String arrayField;
4949

5050
@Override
51-
public DataProcessorDescription declareModel() {
52-
return ProcessingElementBuilder
53-
.create("org.apache.streampipes.processors.transformation.jvm.count-array", 0)
54-
.category(DataProcessorType.COUNT_OPERATOR)
55-
.withLocales(Locales.EN)
56-
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
57-
.requiredStream(
58-
StreamRequirementsBuilder.create()
59-
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
60-
Labels.withId(ARRAY_FIELD), PropertyScope.NONE)
61-
.build())
62-
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(Labels.empty(), COUNT_NAME,
63-
SO.NUMBER)))
64-
.build();
51+
public IDataProcessorConfiguration declareConfig() {
52+
return DataProcessorConfiguration.create(
53+
CountArrayProcessor::new,
54+
ProcessingElementBuilder
55+
.create("org.apache.streampipes.processors.transformation.jvm.count-array", 0)
56+
.category(DataProcessorType.COUNT_OPERATOR)
57+
.withLocales(Locales.EN)
58+
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
59+
.requiredStream(
60+
StreamRequirementsBuilder.create()
61+
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
62+
Labels.withId(ARRAY_FIELD), PropertyScope.NONE)
63+
.build())
64+
.outputStrategy(OutputStrategies.append(EpProperties.doubleEp(Labels.empty(), COUNT_NAME, SO.NUMBER)))
65+
.build()
66+
);
6567
}
6668

6769
@Override
68-
public void onInvocation(ProcessorParams parameters,
69-
SpOutputCollector spOutputCollector,
70-
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
70+
public void onPipelineStarted(IDataProcessorParameters parameters,
71+
SpOutputCollector spOutputCollector,
72+
EventProcessorRuntimeContext runtimeContext) {
7173
this.arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD);
7274
}
7375

7476
@Override
75-
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
76-
77+
public void onEvent(Event event, SpOutputCollector collector) {
7778
List<AbstractField> allEvents = event.getFieldBySelector(arrayField).getAsList().getRawValue();
78-
7979
event.addField(CountArrayProcessor.COUNT_NAME, allEvents.size());
80-
8180
collector.collect(event);
8281
}
8382

8483
@Override
85-
public void onDetach() throws SpRuntimeException {
86-
84+
public void onPipelineStopped() {
8785
}
8886
}

streampipes-extensions/streampipes-processors-transformation-jvm/src/main/java/org/apache/streampipes/processors/transformation/jvm/processor/array/split/SplitArrayProcessor.java

Lines changed: 60 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -18,13 +18,14 @@
1818

1919
package org.apache.streampipes.processors.transformation.jvm.processor.array.split;
2020

21-
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
21+
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
22+
import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
2223
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
24+
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
2325
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
2426
import org.apache.streampipes.extensions.api.runtime.ResolvesContainerProvidedOutputStrategy;
2527
import org.apache.streampipes.model.DataProcessorType;
2628
import org.apache.streampipes.model.extensions.ExtensionAssetType;
27-
import org.apache.streampipes.model.graph.DataProcessorDescription;
2829
import org.apache.streampipes.model.graph.DataProcessorInvocation;
2930
import org.apache.streampipes.model.runtime.Event;
3031
import org.apache.streampipes.model.runtime.field.AbstractField;
@@ -36,21 +37,20 @@
3637
import org.apache.streampipes.model.schema.PropertyScope;
3738
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
3839
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
40+
import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
3941
import org.apache.streampipes.sdk.extractor.ProcessingElementParameterExtractor;
4042
import org.apache.streampipes.sdk.helpers.EpRequirements;
4143
import org.apache.streampipes.sdk.helpers.Labels;
4244
import org.apache.streampipes.sdk.helpers.Locales;
4345
import org.apache.streampipes.sdk.helpers.OutputStrategies;
44-
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
45-
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
4646

4747
import java.util.ArrayList;
4848
import java.util.List;
4949
import java.util.Map;
5050

51-
public class SplitArrayProcessor extends StreamPipesDataProcessor
52-
implements ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
53-
51+
public class SplitArrayProcessor
52+
implements IStreamPipesDataProcessor,
53+
ResolvesContainerProvidedOutputStrategy<DataProcessorInvocation, ProcessingElementParameterExtractor> {
5454
public static final String KEEP_PROPERTIES_ID = "keep";
5555
public static final String ARRAY_FIELD_ID = "array-field";
5656
public static final String VALUE = "array_value";
@@ -59,28 +59,36 @@ public class SplitArrayProcessor extends StreamPipesDataProcessor
5959
private List<String> keepProperties;
6060

6161
@Override
62-
public DataProcessorDescription declareModel() {
63-
return ProcessingElementBuilder
64-
.create("org.apache.streampipes.processors.transformation.jvm.split-array", 0)
65-
.category(DataProcessorType.TRANSFORM)
66-
.withLocales(Locales.EN)
67-
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
68-
.requiredStream(StreamRequirementsBuilder.create()
69-
.requiredPropertyWithNaryMapping(EpRequirements.anyProperty(),
70-
Labels.withId(KEEP_PROPERTIES_ID),
71-
PropertyScope.NONE)
72-
.requiredPropertyWithUnaryMapping(EpRequirements.listRequirement(),
73-
Labels.withId(ARRAY_FIELD_ID),
74-
PropertyScope.NONE)
75-
.build())
76-
.outputStrategy(OutputStrategies.customTransformation())
77-
.build();
62+
public IDataProcessorConfiguration declareConfig() {
63+
return DataProcessorConfiguration.create(
64+
SplitArrayProcessor::new,
65+
ProcessingElementBuilder
66+
.create("org.apache.streampipes.processors.transformation.jvm.split-array", 0)
67+
.category(DataProcessorType.TRANSFORM)
68+
.withLocales(Locales.EN)
69+
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
70+
.requiredStream(StreamRequirementsBuilder.create()
71+
.requiredPropertyWithNaryMapping(
72+
EpRequirements.anyProperty(),
73+
Labels.withId(KEEP_PROPERTIES_ID),
74+
PropertyScope.NONE
75+
)
76+
.requiredPropertyWithUnaryMapping(
77+
EpRequirements.listRequirement(),
78+
Labels.withId(ARRAY_FIELD_ID),
79+
PropertyScope.NONE
80+
)
81+
.build())
82+
.outputStrategy(OutputStrategies.customTransformation())
83+
.build()
84+
);
7885
}
7986

8087
@Override
81-
public EventSchema resolveOutputStrategy(DataProcessorInvocation processingElement,
82-
ProcessingElementParameterExtractor extractor)
83-
throws SpRuntimeException {
88+
public EventSchema resolveOutputStrategy(
89+
DataProcessorInvocation processingElement,
90+
ProcessingElementParameterExtractor extractor
91+
) {
8492
String arrayFieldSelector = extractor.mappingPropertyValue(ARRAY_FIELD_ID);
8593
List<String> keepPropertySelectors = extractor.mappingPropertyValues(KEEP_PROPERTIES_ID);
8694

@@ -91,41 +99,44 @@ public EventSchema resolveOutputStrategy(DataProcessorInvocation processingEleme
9199
newProperty.setLabel("Array Value");
92100
newProperty.setDescription("Contains values of the array. Created by Split Array processor.");
93101

94-
List<EventProperty> keepProperties = extractor.getEventPropertiesBySelector
95-
(keepPropertySelectors);
102+
List<EventProperty> keepProperties = extractor.getEventPropertiesBySelector(keepPropertySelectors);
96103
outProperties.add(newProperty);
97104
outProperties.addAll(keepProperties);
98105

99106
return new EventSchema(outProperties);
100107
}
101108

102109
@Override
103-
public void onInvocation(ProcessorParams parameters,
104-
SpOutputCollector spOutputCollector,
105-
EventProcessorRuntimeContext runtimeContext) throws SpRuntimeException {
106-
arrayField = parameters.extractor().mappingPropertyValue(ARRAY_FIELD_ID);
107-
keepProperties = parameters.extractor().mappingPropertyValues(KEEP_PROPERTIES_ID);
110+
public void onPipelineStarted(
111+
IDataProcessorParameters parameters,
112+
SpOutputCollector spOutputCollector,
113+
EventProcessorRuntimeContext runtimeContext
114+
) {
115+
arrayField = parameters.extractor()
116+
.mappingPropertyValue(ARRAY_FIELD_ID);
117+
keepProperties = parameters.extractor()
118+
.mappingPropertyValues(KEEP_PROPERTIES_ID);
108119
}
109120

110121
@Override
111-
public void onEvent(Event event,
112-
SpOutputCollector collector) throws SpRuntimeException {
113-
114-
List<AbstractField> allEvents = event.getFieldBySelector(arrayField).getAsList()
115-
.parseAsCustomType(o -> {
116-
if (o instanceof NestedField) {
117-
return o;
118-
} else if (o instanceof ListField) {
119-
return o;
120-
} else {
121-
return o;
122-
}
123-
});
122+
public void onEvent(Event event, SpOutputCollector collector) {
123+
List<AbstractField> allEvents = event.getFieldBySelector(arrayField)
124+
.getAsList()
125+
.parseAsCustomType(o -> {
126+
if (o instanceof NestedField) {
127+
return o;
128+
} else if (o instanceof ListField) {
129+
return o;
130+
} else {
131+
return o;
132+
}
133+
});
124134

125135
for (AbstractField field : allEvents) {
126136
Event outEvent = new Event();
127137
if (field instanceof NestedField) {
128-
for (Map.Entry<String, AbstractField> key : ((NestedField) field).getRawValue().entrySet()) {
138+
for (Map.Entry<String, AbstractField> key : ((NestedField) field).getRawValue()
139+
.entrySet()) {
129140
outEvent.addField(key.getValue());
130141
}
131142
} else {
@@ -141,7 +152,7 @@ public void onEvent(Event event,
141152
}
142153

143154
@Override
144-
public void onDetach() throws SpRuntimeException {
145-
155+
public void onPipelineStopped() {
146156
}
147157
}
158+

0 commit comments

Comments
 (0)