Skip to content

poc: tedge MEA DB #3725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft

Conversation

didier-wenzek
Copy link
Contributor

@didier-wenzek didier-wenzek commented Jul 7, 2025

Proposed changes

Thin-edge currently delegates to the MQTT broker all message-persistence requirements

  • It's work well but lacks flexibility
  • If a network outage is long enough to fill the broker database, there is no way to choose which messages can be discarded

Can thin-edge be extended with a Round Robin Database (or similar) to accumulate MEA messages?
Adding the ability to publish consolidated data to the cloud or a data lake.

Idea

It would be nice if users where able to define themselves persistence rules for messages that cannot be processed right now

  • Keep all recent measurements (say from the last hour)
  • Reduce the numbers of measurements if too many (say be keeping 5 minutes averages for the current day)
  • Archive coarse-grained average for a longer period of time
  • Discard messages definitely too old

Demo

  • Use the generic mapper and user-provided rules to filter, transform and aggregate messages
  • Persist messages in an MEA database storing series of messages
  • Drain messages out of a series when too old
  • Process drained messages with the generic mapper: aggregate, discard ...

Next steps

Connect the builtin bridge to the MEA DB.

  • Consume local messages no more directly from MQTT but from MEA series
  • Drain messages from the DB when acknowledged by the cloud
  • Let messages pile up when the network is down
  • Use user-defined rules to reduce the number of pending messages if the outage lasts

Types of changes

  • Bugfix (non-breaking change which fixes an issue)
  • New feature (non-breaking change which adds functionality)
  • Improvement (general improvements like code refactoring that doesn't explicitly fix a bug or add any new functionality)
  • Documentation Update (if none of the other choices apply)
  • Breaking change (fix or feature that would cause existing functionality to not work as expected)

Paste Link to the issue

Checklist

  • I have read the CONTRIBUTING doc
  • I have signed the CLA (in all commits with git commit -s. You can activate automatic signing by running just prepare-dev once)
  • I ran just format as mentioned in CODING_GUIDELINES
  • I used just check as mentioned in CODING_GUIDELINES
  • I have added tests that prove my fix is effective or that my feature works
  • I have added necessary documentation (if appropriate)

Further comments

didier-wenzek and others added 30 commits June 27, 2025 16:10
This commit is a very first step, scaffolding a generic mapper.
The aim is to let users define their own mapping rules
to tranform, filter or enrich data received from various sources.

The idea is to form pipelines of user-provided transformation functions that:

- consume messages from MQTT,
- stream these messages along the transformation stages,
- publish back to MQTT the resulting messages.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
The first design was naive (i.e. loading the javascript module
for each and every function call) and leading to a memory leak
(loading a module with the same name keeps the previous version of the
module in memory).

The new design fixes the issue. The memory is stable while translating to c8y
600 tedge measurements per second during 20 minutes (mode debug)

```shell
$ watch ps -p 1382888 -o args,%cpu,etimes,times,%mem,rss,vsz
COMMAND                     %CPU ELAPSED     TIME %MEM   RSS    VSZ
tedge-mapper gen            47.4    1322      627  0.0 41440 1359476

$ tedge mqtt sub 'test/output' --duration 1000 | pv | wc -l
89.3MiB 0:16:40 [91.4KiB/s] [  <=>                                                                                                                            ]
586311
```

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
The goal is to be able to use the message processor from within the
tedge cli command, notably for testing pipelines and filters.

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Still to be improved: some event are processed twice
- `mv pipeline.toml /tmp` leads to update then remove
- `rm pipeline.toml` correctly triggers only remove
- `cp /tmp/pipeline.toml .` triggers creation and update

Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek didier-wenzek marked this pull request as draft July 7, 2025 13:17
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
Signed-off-by: Marcel Guzik <marcel.guzik@cumulocity.com>
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
@didier-wenzek didier-wenzek self-assigned this Jul 9, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants