Nadi Interconnect is an application that orchestrates data streaming between nodes implementing the NADI (Node Agnostic Datastream Interface), forming a programmable data flow graph. Developed by the skunkforce
organization, it dynamically loads NADI-compliant libraries (DLLs on Windows, .so
files on Linux/macOS) from a user-specified directory (default: ./nodes
), constructs nodes with unique instance identifiers, configures connections between node output and input streams across multiple channels, and manages message lifetimes for streams feeding multiple consumers. It’s ideal for IoT, sensor networks, data acquisition, or real-time analytics.
The NADI interface, defined in include/nadi.h
, provides a minimalistic, platform-independent API for datastreaming, using a meta
(JSON string) + data
(binary) pattern. It supports dynamic library loading, reentrant C-style callbacks, and cross-language compatibility (e.g., C++, Python).
- Dynamic Node Loading: Loads NADI-compliant DLLs/
.so
files from a user-specified directory (default:./nodes
) usingLoadLibrary
(Windows) ordlopen
(Linux/macOS). - Node Construction/Destruction: Constructs multiple instances from a single node library with unique identifiers; destructs instances as needed.
- Programmable Stream Routing: Connects and disconnects producer node output streams to consumer node input streams via JSON control messages.
- Connection Management: Queries active stream connections between instances.
- Message Lifetime Management: Uses
scope-guard
for single-consumer streams; multi-consumer support planned. - JSON Message Processing: Parses
channel
,meta
, anddata
fields fromstdin
usingnlohmann_json
. - Bootstrap Configuration: Initializes nodes and connections at startup via a JSON bootstrap file, parsed with
CLI11
. - Stdio Integration: Built-in
stdio
instance forstdin
/stdout
/stderr
communication. - Modern C++23: Leverages
std::expected
,std::print
, andscope-guard
for robust design. - Cross-Platform: Runs on Linux, macOS, and Windows via
vcpkg
.
- CMake: Version 3.27 or higher.
- C++ Compiler: Supporting C++23 (e.g., GCC 13, Clang 16, MSVC 2022).
- Git: For cloning the repository.
- vcpkg: Automatically fetched for dependency management.
- Libraries:
nlohmann_json
,CLI11
,scope-guard
(installed viavcpkg
). - Operating System: Linux, macOS, or Windows.
-
Clone the Repository:
git clone https://github.com/skunkforce/nadi_node_interconnect.git cd nadi_node_interconnect
-
Create a Build Directory:
mkdir build cd build
-
Configure with CMake:
cmake -S .. -B . -DCMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake
-
Build the Project:
cmake --build . --config Release
The executable
nadi_interconnect
will be generated in thebuild
directory.
-
Prepare Node Libraries: Place NADI-compliant libraries in the node directory (default:
./nodes
or specified via--nodes
). Each library must export all NADI functions (nadi_init
,nadi_deinit
,nadi_send
,nadi_free
,nadi_descriptor
). Example descriptor:{"name":"node1","version":"1.0.0"}
-
Prepare Bootstrap File (Optional): Create a JSON bootstrap file (default:
bootstrap.json
) to construct, connect, and configure nodes at startup. Examplebootstrap.json
for a WebSocket server:{ "messages": [ { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "websocket_server", "instance": "ws_server" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["ws_server", 2], "target": ["interconnect", 61440] } }, { "channel": 61440, "meta": {"format": "json"}, "data": {"type": "nodes.loaded"} } ] }
-
Run the Executable: Run with default settings:
./nadi_interconnect
Specify a custom node directory and/or bootstrap file:
./nadi_interconnect --nodes /path/to/nodes --bootstrap custom_bootstrap.json
Parses
--nodes
and--bootstrap
withCLI11
, loads libraries from the specified node directory (default:./nodes
), processes bootstrap messages, loads thestdio
instance (with output channel0xF000
connected to the interconnect’s input0xF000
), and waits for JSON messages onstdin
. -
Construct Nodes: Send a control message to construct a node (e.g., via
stdio
):echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.construct","node_name":"node1","instance":"node1_instance"}}' | ./nadi_interconnect
Constructs the
node1
library asnode1_instance
. -
Destruct Nodes: Send a control message to destruct a node instance:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.destruct","instance":"node1_instance"}}' | ./nadi_interconnect
Destructs the
node1_instance
instance. -
List Loaded Node Libraries: Send a control message to list loaded node libraries:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.loaded"}}' | ./nadi_interconnect
Response (sent to the originating instance, e.g.,
stdio
channel 0 forstdout
or a dynamic node):{ "meta": {"format": "json"}, "data": { "type": "nodes.loaded.list", "nodes": [ {"name": "node1", "version": "1.0.0"}, {"name": "node2", "version": "1.1.0"} ] } }
-
List Constructed Node Instances: Send a control message to list constructed node instances:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances"}}' | ./nadi_interconnect
Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.list", "instances": [ {"instance": "node1_instance"}, {"instance": "node1_instance2"}, {"instance": "node2_instance"} ] } }
-
List Active Connections: Send a control message to list active stream connections:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.connections"}}' | ./nadi_interconnect
Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.list", "connections": [ {"source": ["node1_instance", 1], "target": ["node2_instance", 1]}, {"source": ["ws_server", 2], "target": ["interconnect", 61440]} ] } }
-
Configure Stream Connections: Send a control message to connect streams:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.connections.connect","source":["node1_instance",1],"target":["node2_instance",1]}}' | ./nadi_interconnect
Connects
node1_instance
’s output channel 1 tonode2_instance
’s input channel 1. -
Disconnect Streams: Send a control message to disconnect streams:
echo '{"channel":61440,"meta":{"format":"json"},"data":{"type":"nodes.instances.connections.disconnect","source":["node1_instance",1],"target":["node2_instance",1]}}' | ./nadi_interconnect
Disconnects the stream from
node1_instance
channel 1 tonode2_instance
channel 1. -
Send Data Messages: Pipe a NADI-compliant data message:
echo '{"channel":1,"meta":{"format":"json"},"data":{"value":42}}' | ./nadi_interconnect
The message is routed to connected nodes, with lifetimes managed for single-consumer streams.
-
Monitor Output: Check
stdout
(stdio channel 0) for node metadata, connection status, and responses, orstderr
(stdio channel 1) for errors, if using thestdio
instance.
-
WebSocket Server: Use a bootstrap file to construct a WebSocket server node, connect its output channel to the interconnect’s
0xF000
channel, and enable clients to list nodes (nodes.loaded
,nodes.instances
), construct/destruct nodes (nodes.instances.construct
,nodes.instances.destruct
), and manage connections (nodes.instances.connections.connect
,nodes.instances.connections.disconnect
).{ "messages": [ { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "websocket_server", "instance": "ws_server" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["ws_server", 2], "target": ["interconnect", 61440] } } ] }
-
Device Driver Logging: Use a bootstrap file to construct a device driver node and a file output node, connect their streams, configure logging, and destruct the nodes afterward.
{ "messages": [ { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "device_driver", "instance": "driver" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "file_output", "instance": "file" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["driver", 1], "target": ["file", 1] } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.destruct", "instance": "driver" } }, { "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.destruct", "instance": "file" } } ] }
- Node Directory: Place NADI-compliant DLLs/
.so
files in the node directory (default:./nodes
, or specified via--nodes
). Themanagement
class loads these libraries, queryingnadi_descriptor()
for metadata. - Bootstrap File: A JSON file (default:
bootstrap.json
) with a"messages"
array of control or data messages to construct, destruct, connect, disconnect, and configure nodes at startup, parsed usingCLI11
. - Stdio Instance: Automatically loaded as
"stdio"
with:- Output channel
0xF000
: Connected to the interconnect’s input0xF000
for control messages. - Output channel 0: Prints to
stdout
. - Output channel 1: Prints to
stderr
. - Input channels: Can receive messages from other nodes, addressed via the
"channel"
field.
- Output channel
- Message Types:
- Data Messages: For stream data, must include:
channel
: Integer for the communication channel (not0xF000
).meta
: JSON object with aformat
field (e.g.,"json"
,"microseconds-double"
).data
: Payload, interpreted based onmeta.format
. Example:
{ "channel": 1, "meta": {"format": "microseconds-double"}, "data": [ [1625097600000000, 25.3], [1625097601000000, 25.5] ] }
- Control Messages: For node construction, destruction, connection management, or node queries (on channel
0xF000
), must include:channel
:61440
(0xF000 in decimal).meta
: JSON object with"format":"json"
.data
: JSON object with control details.- Node Construction:
type
:"nodes.instances.construct"
.node_name
: String specifying the node library’s name (fromnadi_descriptor
).instance
: String defining a unique identifier for the instance, used in connections. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.construct", "node_name": "node1", "instance": "node1_instance" } }
- Node Destruction:
type
:"nodes.instances.destruct"
.instance
: String identifying the instance to destruct. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.destruct", "instance": "node1_instance" } }
- Node Connection:
type
:"nodes.instances.connections.connect"
.source
: Tuple[instance, channel]
, whereinstance
is the instance identifier (string) andchannel
is a number.target
: Tuple[instance, channel]
, specifying the target instance identifier and channel. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.connect", "source": ["node1_instance", 1], "target": ["node2_instance", 1] } }
- Node Disconnection:
type
:"nodes.instances.connections.disconnect"
.source
: Tuple[instance, channel]
, identifying the source instance and channel.target
: Tuple[instance, channel]
, identifying the target instance and channel. Example:
{ "channel": 61440, "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.disconnect", "source": ["node1_instance", 1], "target": ["node2_instance", 1] } }
- Node Library Listing:
type
:"nodes.loaded"
. Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.loaded.list", "nodes": [ {"name": "node1", "version": "1.0.0"}, {"name": "node2", "version": "1.1.0"} ] } }
- Node Instance Listing:
type
:"nodes.instances"
. Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.list", "instances": [ {"instance": "node1_instance"}, {"instance": "node1_instance2"}, {"instance": "node2_instance"} ] } }
- Connection Listing:
type
:"nodes.instances.connections"
. Response (sent to the originating instance):
{ "meta": {"format": "json"}, "data": { "type": "nodes.instances.connections.list", "connections": [ {"source": ["node1_instance", 1], "target": ["node2_instance", 1]}, {"source": ["ws_server", 2], "target": ["interconnect", 61440]} ] } }
- Data Messages: For stream data, must include:
The NADI interface is defined in include/nadi.h
. Key components include:
struct nadi_message {
char* meta; // Null-terminated JSON string
unsigned long meta_hash; // Hash of meta content (0 if unused)
char* data; // Raw binary data
unsigned int data_length;// Length of data in bytes
nadi_free_callback free; // Callback to free the message
nadi_node_handle instance; // Connection instance handle
unsigned int channel; // Channel identifier
};
nadi_init(nadi_node_handle* instance, nadi_receive_callback callback)
: Initializes a NADI instance with a receive callback.nadi_deinit(nadi_node_handle instance)
: Deinitializes an instance, blocking until threads complete.nadi_send(nadi_message* message, nadi_node_handle instance)
: Sends a message, transferring ownership on success.nadi_free(nadi_message* message)
: Frees a message using itsfree
callback.nadi_descriptor()
: Returns a JSON string with node metadata (e.g.,{"name":"node","version":"1.0.0"}
).
struct nadi_library {
nadi_init_pt init; // Pointer to nadi_init
nadi_deinit_pt deinit; // Pointer to nadi_deinit
nadi_send_pt send; // Pointer to nadi_send
nadi_free_pt free; // Pointer to nadi_free
nadi_descriptor_pt descriptor; // Pointer to nadi_descriptor
// Platform-specific library handle (HMODULE on Windows, void* on Linux/macOS)
};
nadi_library load_node(std::string path)
: Loads a NADI-compliant library, mapping its exported functions tonadi_library
pointers.
load_nodes(const std::string& dir)
: Loads NADI-compliant libraries from the specified directory usingload_node
.to_json()
: Returns JSON representation of loaded nodes.callback(nadi_message* msg)
: Processes messages:- Routes data messages (non-
0xF000
channels) to connected nodes. - Parses control messages (
0xF000
channel) for node construction (nodes.instances.construct
), destruction (nodes.instances.destruct
), connection management (nodes.instances.connections.connect
,nodes.instances.connections.disconnect
,nodes.instances.connections
), node library listing (nodes.loaded
), node instance listing (nodes.instances
), or bootstrap initialization. - Manages message lifetimes with
scope-guard
(single-consumer); multi-consumer support planned.
- Routes data messages (non-
load_nodes(const std::string& path)
: Loads libraries from the specified directory usingfind_nodes::get_node_paths
andload_node
.construct_node
: Constructs a node instance from a library, initializing it withnadi_init
.destruct_node
: Destructs an instance, deinitializing it withnadi_deinit
.send_loaded_list
: Sends anodes.loaded.list
response with library metadata.send_instances_list
: Sends anodes.instances.list
response with instance identifiers.lib_from_instance
: Retrieves thenadi_library
for an instance handle.
connect
: Connects a source instance and channel to a target instance and channel (implementation incomplete).disconnect
: Disconnects a source-target stream (implementation incomplete).send_connections_list
: Sends anodes.instances.connections.list
response with active connections (implementation incomplete).destinations_from
: Returns destinations for a given source instance and channel.
(Note: Full details for connection management require complete implementations in message_routing.hpp
.)
To build with debug symbols:
cmake -S .. -B . -CMAKE_BUILD_TYPE=Debug -CMAKE_TOOLCHAIN_FILE=../vcpkg/scripts/buildsystems/vcpkg.cmake
cmake --build . --config Debug
Contributions are welcome! Follow these steps:
- Fork the repository.
- Create a feature branch (
git checkout -b feature/your-feature
). - Commit changes (
git commit -m 'Add your feature'
). - Push to the branch (
git push origin feature/your-feature
). - Open a Pull Request.
See CONTRIBUTING.md for guidelines.
Licensed under the MIT License. See LICENSE for details.