-
Notifications
You must be signed in to change notification settings - Fork 2
Processing Graph
We now allow the flexibility to define a processing graph consisting of various window, filter, abstraction or CEP components.
When running massif, you can register a new graph on HTTP://massifip:9000/register by doing an HTTP POST containing the configuration in a JSON Object. The components defines each component in the graph, while the configuration defines how the various components are linked. Basically "window": ["filter"]
states that the window component will forward its results to the filter component.
A complete example can be found bellow:
{
"components": {
"print": {
"type": "Sink",
"impl": "PrintSink"
},
"window": {
"size": 1,
"slide": 1
},
"filter": {
"type": "Filter",
"queries": ["CONSTRUCT{?s ?p ?p.} WHERE {?s ?p ?o}"]
},
"abstract": {
"type": "Abstract",
"expressions": [
{
"head": "http://massif.test/EventA",
"tail": "Observation"
}
]
},
"kafka": {
"type": "Source",
"impl": "KafkaSource",
"kafkaServer": "kafka-headless.kafka:9092",
"kafkaTopic": "idlab.homelab.semantic"
},
"sink": {
"type": "Sink",
"impl": "HTTPGetSinkCombined",
"path":"sink",
"config":"last"
},
},
"configuration": {
"print": [],
"abstract": [
"print"
],
"window": [
"filter"
],
"kafka": [
"sink"
],
"filter": [
"abstract"
]
}
}
Currently, the following components and their parameters are supported to be deployed using the API:
{"type":"Sink","impl":"PrintSink"}
: A simple sink that prints the results to standard output.
{"type":"Sink","impl":"httpgetsinkcombined","path":"somepath","config":"last|all"}
: Allows to capture the results in this sink using a HTTP Get call at HTTP://massifip:9000/httpgetsink/somepath. The last
config parameters will configure the sink to only store the last message, while all
stores all the messages the sink receives until the sink is called.
{"type": "Source","impl": "KafkaSource","kafkaServer": "kafkaserver","kafkaTopic": "kafkatopic"}
: Connects to a Kafka broker and starts reading from a defined topic.
{"type": "Source","impl": "HTTPGetSource","url": "someurl","timeout": sometimeout}
: Calls the URL with a time out of sometimeout
.
{"type": "Source","impl": "HTTPPostSource","path": "somepath","port": someport}
: Allows to HTTP Post call with data on http://massifip:someport/path
{"type":"Source","impl":"FileSource","fileName":"pathToFile","timeout":timeout}
: Reads a file from pathToFile
line by line and waits timeout
time in between lines.
{"size": windowSize,"slide": slideSize}
: Defines a window of windowSize
that resports results every slideSize
.
{"type": "Filter","ontology":"someurl", "queries": ["somequery"]}
: A filter that can optionally load an ontology
hosted on someurl
. All defined queries in queries
(multiple possible) will be executed when the filter receives data.
{"type": "Abstract","expressions": [{"head": "someHead","tail": "some definition in Manchester syntax"} ], "ontologyIRI":"someurl"}
: launches an abstraction layer that can optionally load an ontology schema defined in ontologyIRI
. The expressions
define heads and tails, with heads a new class name and tail a definition in Manchester syntax.
TODO
Check out the sensor example that loads a simple stream, triple by triple from file every second and prints the content of the window to file after it reports every second.