Skip to content

Commit 6e90a99

Browse files
authored
refactor(#3655): Update text mining processors to implement IStreamPipesDataProcessor interface (#3665)
1 parent d0294a7 commit 6e90a99

File tree

6 files changed

+308
-220
lines changed

6 files changed

+308
-220
lines changed

streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/chunker/ChunkerProcessor.java

Lines changed: 68 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -19,25 +19,26 @@
1919
package org.apache.streampipes.processors.textmining.jvm.processor.chunker;
2020

2121
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
22+
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
23+
import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
2224
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
25+
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
2326
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
2427
import org.apache.streampipes.model.DataProcessorType;
2528
import org.apache.streampipes.model.extensions.ExtensionAssetType;
26-
import org.apache.streampipes.model.graph.DataProcessorDescription;
2729
import org.apache.streampipes.model.runtime.Event;
2830
import org.apache.streampipes.model.runtime.field.ListField;
2931
import org.apache.streampipes.model.schema.PropertyScope;
3032
import org.apache.streampipes.processors.textmining.jvm.processor.TextMiningUtil;
3133
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
3234
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
35+
import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
3336
import org.apache.streampipes.sdk.helpers.EpProperties;
3437
import org.apache.streampipes.sdk.helpers.EpRequirements;
3538
import org.apache.streampipes.sdk.helpers.Labels;
3639
import org.apache.streampipes.sdk.helpers.Locales;
3740
import org.apache.streampipes.sdk.helpers.OutputStrategies;
3841
import org.apache.streampipes.sdk.utils.Datatypes;
39-
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
40-
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
4142

4243
import opennlp.tools.chunker.ChunkerME;
4344
import opennlp.tools.chunker.ChunkerModel;
@@ -49,7 +50,7 @@
4950
import java.util.Arrays;
5051
import java.util.List;
5152

52-
public class ChunkerProcessor extends StreamPipesDataProcessor {
53+
public class ChunkerProcessor implements IStreamPipesDataProcessor {
5354

5455
private static final String TAGS_FIELD_KEY = "tagsField";
5556
private static final String TOKENS_FIELD_KEY = "tokensField";
@@ -62,44 +63,59 @@ public class ChunkerProcessor extends StreamPipesDataProcessor {
6263
private ChunkerME chunker;
6364

6465
@Override
65-
public DataProcessorDescription declareModel() {
66-
return ProcessingElementBuilder
67-
.create("org.apache.streampipes.processors.textmining.jvm.chunker", 0)
68-
.category(DataProcessorType.ENRICH_TEXT)
69-
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
70-
.withLocales(Locales.EN)
71-
.requiredFile(Labels.withId(BINARY_FILE_KEY))
72-
.requiredStream(StreamRequirementsBuilder
73-
.create()
74-
.requiredPropertyWithUnaryMapping(
75-
EpRequirements.listRequirement(Datatypes.String),
76-
Labels.withId(TAGS_FIELD_KEY),
77-
PropertyScope.NONE)
78-
.requiredPropertyWithUnaryMapping(
79-
EpRequirements.listRequirement(Datatypes.String),
80-
Labels.withId(TOKENS_FIELD_KEY),
81-
PropertyScope.NONE)
82-
.build())
83-
.outputStrategy(OutputStrategies.append(
84-
EpProperties.listStringEp(
85-
Labels.withId(CHUNK_TYPE_FIELD_KEY),
86-
CHUNK_TYPE_FIELD_KEY,
87-
"http://schema.org/ItemList"),
88-
EpProperties.listStringEp(
89-
Labels.withId(CHUNK_FIELD_KEY),
90-
CHUNK_FIELD_KEY,
91-
"http://schema.org/ItemList")))
92-
.build();
66+
public IDataProcessorConfiguration declareConfig() {
67+
return DataProcessorConfiguration.create(
68+
ChunkerProcessor::new,
69+
ProcessingElementBuilder
70+
.create("org.apache.streampipes.processors.textmining.jvm.chunker", 0)
71+
.category(DataProcessorType.ENRICH_TEXT)
72+
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
73+
.withLocales(Locales.EN)
74+
.requiredFile(Labels.withId(BINARY_FILE_KEY))
75+
.requiredStream(StreamRequirementsBuilder
76+
.create()
77+
.requiredPropertyWithUnaryMapping(
78+
EpRequirements.listRequirement(Datatypes.String),
79+
Labels.withId(TAGS_FIELD_KEY),
80+
PropertyScope.NONE
81+
)
82+
.requiredPropertyWithUnaryMapping(
83+
EpRequirements.listRequirement(Datatypes.String),
84+
Labels.withId(TOKENS_FIELD_KEY),
85+
PropertyScope.NONE
86+
)
87+
.build())
88+
.outputStrategy(OutputStrategies.append(
89+
EpProperties.listStringEp(
90+
Labels.withId(CHUNK_TYPE_FIELD_KEY),
91+
CHUNK_TYPE_FIELD_KEY,
92+
"http://schema.org/ItemList"
93+
),
94+
EpProperties.listStringEp(
95+
Labels.withId(CHUNK_FIELD_KEY),
96+
CHUNK_FIELD_KEY,
97+
"http://schema.org/ItemList"
98+
)
99+
))
100+
.build()
101+
);
93102
}
94103

95104
@Override
96-
public void onInvocation(ProcessorParams parameters,
97-
SpOutputCollector spOutputCollector,
98-
EventProcessorRuntimeContext context) throws SpRuntimeException {
99-
this.tags = parameters.extractor().mappingPropertyValue(TAGS_FIELD_KEY);
100-
this.tokens = parameters.extractor().mappingPropertyValue(TOKENS_FIELD_KEY);
101-
String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
102-
byte[] fileContent = context.getStreamPipesClient().fileApi().getFileContent(filename);
105+
public void onPipelineStarted(
106+
IDataProcessorParameters params,
107+
SpOutputCollector collector,
108+
EventProcessorRuntimeContext context
109+
) {
110+
this.tags = params.extractor()
111+
.mappingPropertyValue(TAGS_FIELD_KEY);
112+
this.tokens = params.extractor()
113+
.mappingPropertyValue(TOKENS_FIELD_KEY);
114+
String filename = params.extractor()
115+
.selectedFilename(BINARY_FILE_KEY);
116+
byte[] fileContent = context.getStreamPipesClient()
117+
.fileApi()
118+
.getFileContent(filename);
103119

104120
InputStream modelIn = new ByteArrayInputStream(fileContent);
105121
ChunkerModel model;
@@ -113,19 +129,23 @@ public void onInvocation(ProcessorParams parameters,
113129
}
114130

115131
@Override
116-
public void onEvent(Event event,
117-
SpOutputCollector collector) throws SpRuntimeException {
118-
ListField tags = event.getFieldBySelector(this.tags).getAsList();
119-
ListField tokens = event.getFieldBySelector(this.tokens).getAsList();
132+
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
133+
ListField tags = event.getFieldBySelector(this.tags)
134+
.getAsList();
135+
ListField tokens = event.getFieldBySelector(this.tokens)
136+
.getAsList();
120137

121-
122-
String[] tagsArray = tags.castItems(String.class).toArray(String[]::new);
123-
String[] tokensArray = tokens.castItems(String.class).toArray(String[]::new);
138+
String[] tagsArray = tags.castItems(String.class)
139+
.toArray(String[]::new);
140+
String[] tokensArray = tokens.castItems(String.class)
141+
.toArray(String[]::new);
124142

125143
Span[] spans = chunker.chunkAsSpans(tokensArray, tagsArray);
126144

127145
List<String> chunks = TextMiningUtil.extractSpans(spans, tokensArray);
128-
String[] types = Arrays.stream(spans).map(Span::getType).toArray(String[]::new);
146+
String[] types = Arrays.stream(spans)
147+
.map(Span::getType)
148+
.toArray(String[]::new);
129149

130150
event.addField(ChunkerProcessor.CHUNK_TYPE_FIELD_KEY, types);
131151
event.addField(ChunkerProcessor.CHUNK_FIELD_KEY, chunks);
@@ -134,7 +154,6 @@ public void onEvent(Event event,
134154
}
135155

136156
@Override
137-
public void onDetach() throws SpRuntimeException {
138-
157+
public void onPipelineStopped() {
139158
}
140159
}

streampipes-extensions/streampipes-processors-text-mining-jvm/src/main/java/org/apache/streampipes/processors/textmining/jvm/processor/language/LanguageDetectionProcessor.java

Lines changed: 53 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -19,22 +19,23 @@
1919
package org.apache.streampipes.processors.textmining.jvm.processor.language;
2020

2121
import org.apache.streampipes.commons.exceptions.SpRuntimeException;
22+
import org.apache.streampipes.extensions.api.pe.IStreamPipesDataProcessor;
23+
import org.apache.streampipes.extensions.api.pe.config.IDataProcessorConfiguration;
2224
import org.apache.streampipes.extensions.api.pe.context.EventProcessorRuntimeContext;
25+
import org.apache.streampipes.extensions.api.pe.param.IDataProcessorParameters;
2326
import org.apache.streampipes.extensions.api.pe.routing.SpOutputCollector;
2427
import org.apache.streampipes.model.DataProcessorType;
2528
import org.apache.streampipes.model.extensions.ExtensionAssetType;
26-
import org.apache.streampipes.model.graph.DataProcessorDescription;
2729
import org.apache.streampipes.model.runtime.Event;
2830
import org.apache.streampipes.model.schema.PropertyScope;
2931
import org.apache.streampipes.sdk.builder.ProcessingElementBuilder;
3032
import org.apache.streampipes.sdk.builder.StreamRequirementsBuilder;
33+
import org.apache.streampipes.sdk.builder.processor.DataProcessorConfiguration;
3134
import org.apache.streampipes.sdk.helpers.EpProperties;
3235
import org.apache.streampipes.sdk.helpers.EpRequirements;
3336
import org.apache.streampipes.sdk.helpers.Labels;
3437
import org.apache.streampipes.sdk.helpers.Locales;
3538
import org.apache.streampipes.sdk.helpers.OutputStrategies;
36-
import org.apache.streampipes.wrapper.params.compat.ProcessorParams;
37-
import org.apache.streampipes.wrapper.standalone.StreamPipesDataProcessor;
3839

3940
import opennlp.tools.langdetect.Language;
4041
import opennlp.tools.langdetect.LanguageDetector;
@@ -45,7 +46,7 @@
4546
import java.io.IOException;
4647
import java.io.InputStream;
4748

48-
public class LanguageDetectionProcessor extends StreamPipesDataProcessor {
49+
public class LanguageDetectionProcessor implements IStreamPipesDataProcessor {
4950

5051
private static final String DETECTION_FIELD_KEY = "detectionField";
5152
static final String LANGUAGE_KEY = "language";
@@ -56,42 +57,55 @@ public class LanguageDetectionProcessor extends StreamPipesDataProcessor {
5657
private LanguageDetector languageDetector;
5758

5859
@Override
59-
public DataProcessorDescription declareModel() {
60-
return ProcessingElementBuilder
61-
.create("org.apache.streampipes.processors.textmining.jvm.languagedetection", 0)
62-
.category(DataProcessorType.ENRICH_TEXT)
63-
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
64-
.withLocales(Locales.EN)
65-
.requiredFile(Labels.withId(BINARY_FILE_KEY))
66-
.requiredStream(StreamRequirementsBuilder
67-
.create()
68-
.requiredPropertyWithUnaryMapping(
69-
EpRequirements.stringReq(),
70-
Labels.withId(DETECTION_FIELD_KEY),
71-
PropertyScope.NONE)
72-
.build())
73-
.outputStrategy(OutputStrategies.append(
74-
EpProperties.stringEp(
75-
Labels.withId(LANGUAGE_KEY),
76-
LANGUAGE_KEY,
77-
"http://schema.org/language"),
78-
EpProperties.doubleEp(
79-
Labels.withId(CONFIDENCE_KEY),
80-
CONFIDENCE_KEY,
81-
"https://schema.org/Float")))
82-
.build();
60+
public IDataProcessorConfiguration declareConfig() {
61+
return DataProcessorConfiguration.create(
62+
LanguageDetectionProcessor::new,
63+
ProcessingElementBuilder
64+
.create("org.apache.streampipes.processors.textmining.jvm.languagedetection", 0)
65+
.category(DataProcessorType.ENRICH_TEXT)
66+
.withAssets(ExtensionAssetType.DOCUMENTATION, ExtensionAssetType.ICON)
67+
.withLocales(Locales.EN)
68+
.requiredFile(Labels.withId(BINARY_FILE_KEY))
69+
.requiredStream(StreamRequirementsBuilder
70+
.create()
71+
.requiredPropertyWithUnaryMapping(
72+
EpRequirements.stringReq(),
73+
Labels.withId(DETECTION_FIELD_KEY),
74+
PropertyScope.NONE
75+
)
76+
.build())
77+
.outputStrategy(OutputStrategies.append(
78+
EpProperties.stringEp(
79+
Labels.withId(LANGUAGE_KEY),
80+
LANGUAGE_KEY,
81+
"http://schema.org/language"
82+
),
83+
EpProperties.doubleEp(
84+
Labels.withId(CONFIDENCE_KEY),
85+
CONFIDENCE_KEY,
86+
"https://schema.org/Float"
87+
)
88+
))
89+
.build()
90+
);
8391
}
8492

8593
@Override
86-
public void onInvocation(ProcessorParams parameters,
87-
SpOutputCollector spOutputCollector,
88-
EventProcessorRuntimeContext context) throws SpRuntimeException {
89-
String filename = parameters.extractor().selectedFilename(BINARY_FILE_KEY);
90-
byte[] fileContent = context.getStreamPipesClient().fileApi().getFileContent(filename);
91-
this.detection = parameters.extractor().mappingPropertyValue(DETECTION_FIELD_KEY);
94+
public void onPipelineStarted(
95+
IDataProcessorParameters params,
96+
SpOutputCollector collector,
97+
EventProcessorRuntimeContext context
98+
) {
99+
String filename = params.extractor()
100+
.selectedFilename(BINARY_FILE_KEY);
101+
byte[] fileContent = context.getStreamPipesClient()
102+
.fileApi()
103+
.getFileContent(filename);
104+
this.detection = params.extractor()
105+
.mappingPropertyValue(DETECTION_FIELD_KEY);
92106

93107
InputStream modelIn = new ByteArrayInputStream(fileContent);
94-
LanguageDetectorModel model = null;
108+
LanguageDetectorModel model;
95109
try {
96110
model = new LanguageDetectorModel(modelIn);
97111
} catch (IOException e) {
@@ -103,7 +117,9 @@ public void onInvocation(ProcessorParams parameters,
103117

104118
@Override
105119
public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeException {
106-
String text = event.getFieldBySelector(detection).getAsPrimitive().getAsString();
120+
String text = event.getFieldBySelector(detection)
121+
.getAsPrimitive()
122+
.getAsString();
107123
Language language = languageDetector.predictLanguage(text);
108124

109125
event.addField(LanguageDetectionProcessor.LANGUAGE_KEY, language.getLang());
@@ -113,7 +129,6 @@ public void onEvent(Event event, SpOutputCollector collector) throws SpRuntimeEx
113129
}
114130

115131
@Override
116-
public void onDetach() throws SpRuntimeException {
117-
132+
public void onPipelineStopped() {
118133
}
119134
}

0 commit comments

Comments
 (0)