Skip to content

Stream Processing Interface (SPI) is the base for an embedded and distributed running stream processing engine written in C++.

License

Notifications You must be signed in to change notification settings

luca-vogels/spi

Repository files navigation

Stream Processing Interface (SPI)

Zero dependency programming interface for creating stream processing systems that process unstructed tuples and are not affected by the tuple size.

High Level Idea

The SPI engine consists of an arbitrary amount of workers that can dynamically join and leave the cluster.
Each application creates a 'SPI' object for interaction with the SPI engine which internally creates a new worker instance (multiple application can run on the same node/hardware without conflicting).
Through the 'SPI' object the application can globally define 'Flows' that determine how tuples should be processed by the engine.
Applications can subscribe to flows in order to receive the processed tuples.
Publishing tuples can also be done through the 'SPI' object as well.

The SPI engine fully hides networking and flow execution/optimization from the applications.
All SPI instances work together as if they were a single instance which allows applications to seamlessly publish and receive tuples regardless of the underlying physical node.

Tuples are sent in a push-based manner from publishers to subscribers.
Subscribers however can either pull tuples from the engine or receive them via callback functions.

By default, if subscribers are not able to keep up with the incoming tuples a back-pressure mechanism is triggered which propages back to the publishers. Small performance drops are smoothed out by buffering mechanisms.

Overview Graph

Getting Started

Setup

  1. Recommended: Setup a domain that points to one or multiple nodes in the cluster using A or AAAA DNS entries. The listesd addresses are used as seed nodes for new workers to join the cluster. Other workers are discovered automatically as soon as a connection to one of the seed servers is established. Alternative: Tell worker via config or cli IP(s) of other worker(s) in the cluster.
    It is not necessary to link the IPs of all servers but of the listed nodes at any point in time at least one node should be reachable otherwise no new workers can join the cluster.
  2. Compile this project using following steps (C++):
    a) Edit variables in root CMakeLists.txt
    b) Run command mkdir build && cd build
    c) Run command cmake -D CMAKE_C_COMPILER=gcc-11 -D CMAKE_CXX_COMPILER=g++-11 -D CMAKE_BUILD_TYPE=Release ..
    d) Run command make -j
  3. Cusotmize settings using the config file in the build directory build/bin/conf/SPI.conf.
  4. Start using this library. All include files can be found in src/include/.
  5. Link this library spi_lib to your project.

Simple Example

void handleReceivedTuples(void* mySubPtr, void*, uint32_t, std::vector<Tuple> tuples){
    Subscriber* mySub = (Subscriber*)mySubPtr;
    for(Tuple &tuple : tuples){
        TimeStamp ts = tuple.readTimeStamp();
        std::string msg = tuple.readString();
        std::cout << TimeUtils::toString(ts) << ": " << msg << std::endl;
    }
    mySub->releaseTuples(tuples); // important!
}


void main(int argc, char* argv[]){

    // Config
    Config config;
    config.loadFromCommandLineArgs(argc, argv); // not necessary
    // config.setClusterAddresses("cluster.examepl.com"); // not needed for standalone

    // SPI API
    SPI spi(config);



    // Define a flow
    std::string publisherGroup = "myPublishers";
    auto filterExpr = Flow::expressionTupleData(0, TYPE::TIMESTAMP, 0)->compare(CMP::GREATER, Flow::expressionConst((uint64_t)0));
    auto myFlow = Flow::input(publisherGroup)->filter(filterExpr)->output();

    // Register the flow
    std::string flowName = "myFlow";
    FlowOptions flowOptions; // optimization options
    spi.defineFlow(flowName, std::move(myFlow), flowOptions);



    // Create a publisher
    PublisherOptions publisherOptions; // optimization options
    publisherOptions.publisherGroup = publisherGroup;
    auto myPub = spi.createPublisher(publisherOptions);

    // Create a tuple and write some data to it
    uint64_t tupleSize = 32; // in bytes
    Tuple tuple = myPub->getTuple(tupleSize); // tuples must be requested from the publisher
    tuple.writeTimeStamp(TimeUtils::now());
    tuple.writeString("Hello, World!");

    // Publish the tuple
    myPub->publish(tuple);



    // Subscriber to a flow
    SubscriberOptions subscriberOptions; // optimization options
    auto mySub = spi.subscribe(flowName, subscriberOptions, handleReceivedTuples);


    Thread::sleepSec(5); // wait for 5 seconds

    // delete flow
    spi.deleteFlow(flowName);

    // blocks until all publishers and subscribers of this instance have been closed.
    spi.close();

    return 0;
}

More code examples can be found in examples.

Contribute

This section gives some additional details for developers who want to contribute to this project.

High Level Idea

SPI is designed fully modular with a worker class that glues all components together. To extend the functionality of a module the respective interface must be implemented and registered. An implementation can access all functionalities of SPI through the Worker API which similar to the SPI API but with additional features.

The following images gives an overview of all components/modules that SPI comes with and how they build upon each other:

Overview Stack

Modules

If you want to extend the functionality of SPI is here a list of where you can find the respective classes or READMEs that explain the module in more detail:

Operator How a new physical operator can be implemented and registered is desribed in src/flow/operators/.

Flow Manager The flow manager converts a flow representation into a chain of physical operator instances and monitors that flow. How the existing flow manager can be adapted or a new one created is described in src/flow/managers/.

Flow Representation Creating new flow representation operators which application use to defined how tuples should be processed and which are converted into chains of physical operator instances is described in src/flow/.

Expressions An expression consists of expression operators that can be combined and allow to evaluate a mathmatical expression of tuple data. How to create new expression operators is described in src/flow/.

Metadata Packets Metadata packets are used to send metadata information through the network. They can be used for custom purposes as well. How to create new metadata packets is described in src/net/packets/.

Metadata Protocol A metadata protocol implements how metadata packets are sent between workers. How a new metadata protocol can be implemented is described in src/net/protocols/.

Tuple Protocol A tuple protocol creates tuple channels through which tuples can be sent between workers. How a new tuple protocol can be implemented is described in src/net/protocols/.

Memory Protocol A memory protocol implements the logic needed for GlobalMemory to be accessible by other workers. How a new memory protocol can be implemented is described in src/net/protocols/.

RDMA API The RDMA API is a lightweight wrapper around the IBV Verbs library with detailed documentation. It is almost fully independent from SPI itself. The default instance can be accessed through worker->getRDMA(). A detailed description can be found in src/net/rdma/.

Consensus Algorithm A consensus algorithm provides distributed locks and a distributed key-value store. How to implement a new consensus algorithm is described in src/net/consensus/.

Time Synchronization Time synchronization algorithms are used to synchronize the clocks of all workers in the cluster. How to implement a new time synchronization algorithm is described in src/net/time/.

File Structure

  • benchmarking/ Code of benchmarking tool and plotting script to perform end-to-end benchmarks and plot the measured metrics (bandwidth, throughput, latency).
  • build/bin/ Binary files to execute benchmarking tool and examples.
  • examples/ Simple code examples to get started with SPI.
  • illustrations/ Images that illustrate different parts of SPI.
  • measurements Contains CSV and JPG files with measurements of a total zero-copy vs. traditional implementation of a passthrough and a filter flow.
  • src/ Contains the actual source-code of SPI.
    • src/conf/ Contains the default config that is copied into build/bin/conf/ during compilation and which is used by the binary files inside build/bin/.
    • src/flow/ Implementation of flow related features.
    • src/include/ Include files for applications to interact with SPI.
    • src/logic/ Implementation of include files as well as of worker class that glues all components together.
    • src/net/ Implementation of network related features.
    • src/net/consensus/ Implementation of consensus algorithms.
    • src/net/packets/ Implementation of metadata packets that can be sent through metadata protocols.
    • src/net/protocols/ Implementation of metadata, tuple, and memory protocols.
    • src/net/rdma/ Implementation of RDMA-API which is almost fully decoupled from SPI itself such that it can be used in other works too.
    • src/net/time/ Implementation of time synchronization algorithms.
    • src/types/ Independent types that are used throughout the project.
    • src/utils/ Utility classes for various purposes.

About

Stream Processing Interface (SPI) is the base for an embedded and distributed running stream processing engine written in C++.

Resources

License

Contributing

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published