This repository is designed to provide a practical and modular framework for Edge AI Data Collection, pre-processing at the edge for AI and IoT applications, and integration into back-end enterprise systems. In modern industrial and enterprise environments, data must be gathered reliably from sensors and edge-computing, validated against schemas, and prepared for downstream use in analytics, training (#AI), and decision-making pipelines.
It exists because industries need standardized, open-source tooling to collect, transform, and validate data from diverse environments before applying machine learning or analytics. Rather than siloed scripts or vendor lock-in, this repo offers a community-driven, extensible foundation for building Traceable Logging, Interoperable, and cost-effective Edge AI pipelines.
By maintaining structured, schema-validated JSONL (training data for AI models) samples and tools, this project helps ensure that:
- Edge devices can integrate smoothly with centralized AI/ML workflows
- Data pipelines remain consistent and machine-readable
- Developers can easily extend support for new sensor types and environments
- Enterprises can maintain auditability and compliance for sensitive workloads
- Traceable Logging
- Centralized logger (logger.py) shared across all adapters and pipelines
- Logs are written to both console and rotating log files under ./data/logs/
- Ensures full traceability of operations and errors, making debugging, audits, and compliance reporting easier
- On-Chain Anchoring (Bitcoin) for Enterprise-Grade Trust:
- Visibility: auditable proofs that data existed at a certain point in time
- Traceability: linkage from edge samples → manifests → anchored Merkle roots
- Immutability: tamper-evidence and regulatory assurance via #Bitcoin mainnet/testnet
This framework is applicable across a wide range of industries:
- Manufacturing – Predictive maintenance, anomaly detection, production quality control
- Transportation & Rail – Fleet monitoring, video analytics, passenger safety
- Energy & Utilities – Smart grids, substation monitoring, IoT telemetry
- Healthcare – Connected devices, operational AI in hospitals
- Finance/Enterprise – Audit logs, secure data pipelines, Bitcoin on-chain verification
The repository currently supports ingestion from multiple industrial protocols and data sources:
-
Binary / Legacy Protocols
- CAN (Controller Area Network)
- Modbus
- PCAP (packet captures)
- Syslog
-
Industrial / Enterprise Systems
- OPC UA
- ERP (Odoo adapter)
- SCADA systems (via adapters)
-
General Adapters
- File-based JSON/JSONL writer
The framework currently supports structured and semi-structured data formats commonly used in industry:
- JSON / JSONL (structured event and log records)
- Binary frames from CAN, Modbus, PCAP
- Syslog text records
- ERP/SCADA structured outputs
- Video frames extracted from media files
Coverage: estimated 70–80% of data and log formats typically encountered across manufacturing, energy, transportation, and enterprise environments.
Layer | Format | Use case |
---|---|---|
Hot logs | JSONL (*.jsonl / *.jsonl.zst ) |
Append-only events, decisions, ops logs |
Hot streams (binary) | Protobuf (*.pbr ) |
High-rate sensor readings (compact, schema’d) |
Batch analytics | Parquet (*.parquet , zstd) |
Columnar storage for queries and features |
Governance | JSON Schema / Avro (*.schema.json / *.avsc ) |
Data contracts & validation |
Ops logs | LOG (*.log ) |
Centralized process/ingestion logs (rotating) |
Media | JPEG/PNG, MP4 | Vision/audio artifacts with sidecar JSON |
Legacy capture | PCAP | Network packet-level logs |
Fieldbus | CAN, Modbus | Industrial machine telemetry |
System | Syslog | Infrastructure/system events |
This repository is designed to work with a wide range of industrial edge computing devices that collect, validate, and stream sensor data.
While the code is hardware-agnostic, the following classes of machines are good candidates:
- Vendors: Sintrones, Advantech, Aaeon, OnLogic, Vecow
- Use cases: factory automation, AI inference, SCADA/PLC integration
- I/O support: Ethernet, RS-232/485, CAN bus, GPIO, USB
- Examples: NVIDIA Jetson (Nano, Xavier, Orin), Raspberry Pi 5, BeagleBone, Orange Pi
- Use cases: AI/ML inference, temperature/vibration data logging, low-power deployments
- I/O support: CSI camera, SPI/I²C, GPIO, Wi-Fi/BT, LTE modules
- Examples: Moxa UC series, HMS Anybus, Siemens IoT2040
- Use cases: protocol translation (OPC UA, Modbus, MQTT, EtherCAT), legacy machine integration
- I/O support: Fieldbus, serial, Ethernet
- Examples: Intel NUC + Movidius Myriad, Google Coral Dev Board, FPGA edge cards
- Use cases: defect detection, predictive maintenance, real-time inference
All adapters and pipelines in this repository use a central logging utility located at:
common/logger.py
- Logs to console (for real-time monitoring).
- Logs to rotating log files under
./data/logs/
(traceable history). - Default rotation: 5 MB per file, up to 5 backups.
- Environment variable
EDGE_AI_LOG_DIR
can override the log storage location.
- Default:
./data/logs/<adapter_or_pipeline>.log
- Example:
./data/logs/can_reader.log
./data/logs/video_recognition.log
2025-08-20 12:41:05 [INFO] can_reader: Listening on CAN interface can0
2025-08-20 12:41:07 [ERROR] opcua_reader: Connection timeout
2025-08-20 12:41:10 [DEBUG] video_recognition: Extracted frame #25
📦 Impact: This makes all data collection and vision pipelines fully traceable for audits, debugging, and enterprise integration.
- Compression: zstd across Parquet; gzip/zstd on rotated JSONL (
*.jsonl.zst
) - Timestamps: UTC, ISO-8601 in JSONL; INT64 in Parquet with TZ meta
- Schemas: keep
schema_fingerprint
(SHA-256) in file metadata/headers - Manifests: per partition directory with per-file SHA-256 + Merkle root
- Rotation: JSONL every 100 MB or 15 min; Parquet target 128–512 MB files
# Modbus (reads holding registers)
python tools/run_adapter.py modbus --host 192.168.1.10 --unit 1 --address 0 --count 10 --output data/samples/modbus/latest.jsonl
# CAN (Linux SocketCAN)
python tools/run_adapter.py can --channel can0 --bustype socketcan --bitrate 500000 --output data/samples/can/capture.jsonl
# PCAP (capture 100 HTTP packets)
python tools/run_adapter.py pcap --iface eth0 --filter "tcp port 80" --count 100 --output data/samples/pcap/http.jsonl
# Syslog listener (UDP 5140)
python tools/run_adapter.py syslog --host 0.0.0.0 --port 5140 --output data/samples/syslog/events.jsonl
# OPC UA
python tools/run_adapter.py opcua --endpoint opc.tcp://localhost:4840 --nodes ns=2;i=2 ns=2;i=3 --output data/samples/opcua/readings.jsonl
# ERP (Odoo)
python tools/run_adapter.py erp_odoo --url http://odoo.local:8069 --db mydb --user admin --password secret --model res.partner --domain "[]" --fields '["name","create_date"]' --limit 10 --output data/samples/erp/partners.jsonl
# Run the video recognition pipeline:
python -m vision.pipelines.video_recognition --input ./data/media/video/sample.mp4 --out ./data/samples/hot/vision --every_ms 500
Install optional dependencies as needed:
pip install -r requirements.txt # or selectively: pip install python-can pymodbus scapy asyncua
docker build -t edge-ai-data:latest .
docker run --rm -v $PWD/data:/app/data edge-ai-data:latest python -m src.cli ingest --config ./configs/example.yaml
python -m venv .venv && . .venv/bin/activate
pip install -r requirements.txt
python -m src.cli ingest --config ./configs/config.yaml
JSON Schema:
python tools/validate_jsonl.py --schema ./schema/temperature.schema.json --input ./data/samples/hot/temperature/2025-08-19/temperature-2025-08-19T04-00.jsonl
Avro (optional): use fastavro
for round-trip tests.
See decision_engine/engine.py
for the interface. You can drop in rule packs and model runners (ONNX/TensorRT).
.github/workflows/ci.yml
runs lint, type checks, and smoke tests; customize as needed.
Makefile route
make lab # builds the image and launches JupyterLab on http://localhost:8888
# or run in background:
make lab-detach
make stop # stop detached lab
docker-compose route
docker compose up jupyter
# open http://localhost:8888
Jupyter starts without a token for local development. For remote servers, set a token/password.
Run lightweight recognition that writes JSONL detections and optional annotated frames.
Images
python -m vision.pipelines.image_recognition --input ./data/media/images --out ./data/samples/hot/vision
Video
python -m vision.pipelines.video_recognition --input ./data/media/video/sample.mp4 --out ./data/samples/hot/vision --every_ms 500
When running vision pipelines, annotated frames (with drawn boxes & labels) are saved under:
data/samples/hot/vision/frames/
Each image or sampled video frame gets a *-annot.png
showing detected objects with bounding boxes and confidence scores.
Useful for smart factory data analysis where visual confirmation of detections is important.
Add --annotate
to save PNGs with bounding boxes & labels. Optionally set --frames_out
(defaults to <out>/frames
).
Images
python -m vision.pipelines.image_recognition --input ./data/media/images --out ./data/samples/hot/vision --annotate --frames_out ./data/samples/hot/vision/frames
Video
python -m vision.pipelines.video_recognition --input ./data/media/video/sample.mp4 --out ./data/samples/hot/vision --every_ms 500 --annotate --frames_out ./data/samples/hot/vision/frames
After running image/video pipelines (optionally with --annotate
), create a partition manifest:
python tools/update_manifest.py --data-root . --outdir ./data/samples/hot/vision --site A --device D --topic vision --date $(date +%F) --hour $(date +%H)
This writes data/manifests/site=A/device=D/topic=vision/date=YYYY-MM-DD/hour=HH/MANIFEST.json
including:
- SHA-256 for detection JSONL and annotated PNG frames
- A Merkle root across all files
- Linkage index: detection file → list of annotated frames referenced inside
You can commit each partition's Merkle root on-chain via an OP_RETURN
output.
Dev note: Use testnet while developing. Mainnet costs real BTC.
Anchor a manifest (bitcoind JSON-RPC):
python tools/anchor_bitcoin.py \
--manifest data/manifests/site=A/device=D/topic=vision/date=YYYY-MM-DD/hour=HH/MANIFEST.json \ --network testnet \ --rpc-url http://127.0.0.1:18332 \ --rpc-user <user> --rpc-pass <pass> \ --wallet <wallet_name> \ --fee-satvB 10
This updates the manifest with:
"anchor": { "network": "testnet", "txid": "<txid>", "anchored_utc": "...", "op_return_hex": "EAD1<merkle_root>" }
Verify (via node RPC, or fallback REST API):
python tools/verify_anchor.py \
--manifest data/manifests/site=A/device=D/topic=vision/date=YYYY-MM-DD/hour=HH/MANIFEST.json \ --network testnet \ --rpc-url http://127.0.0.1:18332 --rpc-user <user> --rpc-pass <pass> --wallet <wallet_name>
The OP_RETURN payload is EAD1
+ the 32-byte merkle_root
from the manifest.
To integrate a new machine type:
- Configure the device’s data acquisition drivers (e.g., sensor SDK, OPC UA client).
- Point the data output to
data/samples/...
in JSONL format. - Validate using:
python tools/validate_jsonl.py --schema ./schema/temperature.schema.json --input ./data/samples/hot/temperature/{date_str}/temperature-{date_str}T{hour_str}-00.jsonl
- (Optional) Extend
adapters/
with your device-specific protocol handler.
Pull requests are welcome! Please run linting and tests before submitting:
ruff check .
pytest
⚠️ Note:
This project assumes the target machine runs Linux (Ubuntu 20.04+ or Debian-based) with Python 3.9+.
Windows can be supported with minor path/glob adjustments.