Skip to content

Processing Graph

Pieter Bonte edited this page Jun 25, 2020 · 6 revisions

Streaming MASSIF Processing Graph

We now allow the flexibility to define a processing graph consisting of various window, filter, abstraction or CEP components.

Using the API

When running massif, you can register a new graph on HTTP://massifip:9000/register by doing an HTTP POST containing the configuration in a JSON Object. The components defines each component in the graph, while the configuration defines how the various components are linked. Basically "window": ["filter"] states that the window component will forward its results to the filter component.

A complete example can be found bellow:

{
  "components": {
    "print": {
      "type": "Sink",
      "impl": "PrintSink"
    },
    "window": {
      "size": 1,
      "slide": 1
    },
    "filter": {
      "type": "Filter",
      "queries": ["CONSTRUCT{?s ?p ?p.} WHERE {?s ?p ?o}"]
    },
    "abstract": {
      "type": "Abstract",
      "expressions": [
        {
          "head": "http://massif.test/EventA",
          "tail": "Observation"
        }
      ]
    },
    "kafka": {
      "type": "Source",
      "impl": "KafkaSource",
      "kafkaServer": "kafka-headless.kafka:9092",
      "kafkaTopic": "idlab.homelab.semantic"
    },
    "sink": {
      "type": "Sink",
      "impl": "HTTPGetSinkCombined",
      "path":"sink",
      "config":"last"
    },
  },
  "configuration": {
    "print": [],
    "abstract": [
      "print"
    ],
    "window": [
      "filter"
    ],
    "kafka": [
      "sink"
    ],
    "filter": [
      "abstract"
    ]
  }
}

Supported components

Currently, the following components and their parameters are supported to be deployed using the API:

Sinks:

{"type":"Sink","impl":"PrintSink"} : A simple sink that prints the results to standard output. {"type":"Sink","impl":"httpgetsinkcombined","path":"somepath","config":"last|all"} : Allows to capture the results in this sink using a HTTP Get call at HTTP://massifip:9000/httpgetsink/somepath. The last config parameters will configure the sink to only store the last message, while all stores all the messages the sink receives until the sink is called.

Sources:

{"type": "Source","impl": "KafkaSource","kafkaServer": "kafkaserver","kafkaTopic": "kafkatopic"}: Connects to a Kafka broker and starts reading from a defined topic. {"type": "Source","impl": "HTTPGetSource","url": "someurl","timeout": sometimeout}: Calls the URL with a time out of sometimeout. {"type": "Source","impl": "HTTPPostSource","path": "somepath","port": someport}: Allows to HTTP Post call with data on http://massifip:someport/path {"type":"Source","impl":"FileSource","fileName":"pathToFile","timeout":timeout}: Reads a file from pathToFile line by line and waits timeout time in between lines.

Windowing:

{"size": windowSize,"slide": slideSize} : Defines a window of windowSize that resports results every slideSize.

Filtering:

{"type": "Filter","ontology":"someurl", "queries": ["somequery"]}: A filter that can optionally load an ontology hosted on someurl. All defined queries in queries (multiple possible) will be executed when the filter receives data.

Abstraction:

{"type": "Abstract","expressions": [{"head": "someHead","tail": "some definition in Manchester syntax"} ], "ontologyIRI":"someurl"}: launches an abstraction layer that can optionally load an ontology schema defined in ontologyIRI. The expressions define heads and tails, with heads a new class name and tail a definition in Manchester syntax.

CEP

TODO

Example:

Check out the sensor example that loads a simple stream, triple by triple from file every second and prints the content of the window to file after it reports every second.

Clone this wiki locally