Skip to content

poc: tedge MEA DB #3725

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 50 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
50 commits
Select commit Hold shift + click to select a range
a8ee90f
Add a new mapper: tedge-gen-mapper
didier-wenzek Mar 24, 2025
4e1613c
Sketch message transformation pipelines
didier-wenzek Mar 25, 2025
e58c8cc
Run JS transformation stages
didier-wenzek May 14, 2025
97c46a8
Update deprecated call tempfile into_path -> keep
didier-wenzek May 16, 2025
1b59365
Avoid to deserialize MQTT at each pipeline stage.
didier-wenzek May 19, 2025
046db7e
Add JS example: drop stragglers
didier-wenzek May 19, 2025
cbc345a
Add ability to configure each stages
didier-wenzek May 19, 2025
c86ee6c
Add JS example: collectd pipeline
didier-wenzek May 20, 2025
c56c290
Add JS example: thin-edge JSON to c8y JSON
didier-wenzek May 20, 2025
5fa5cdd
Fix tests broken by cargo update
didier-wenzek May 20, 2025
09887c5
Update JS filter config using MQTT
didier-wenzek May 20, 2025
42a3fff
Filters can defer messages up to the end of a time window
didier-wenzek May 20, 2025
b61f9d0
Tedge-gen-mapper dynamically reloads pipelines and filters
jarhodes314 May 21, 2025
e0a367c
Tedge-gen-mapper dynamically subscribes to MQTT topics
didier-wenzek May 26, 2025
7d29614
Test JS example: collectd pipeline
didier-wenzek May 28, 2025
edb5e0f
Tidy JS example
jarhodes314 May 30, 2025
35510ee
Relay error messages from Javascript to the generic mapper logs
jarhodes314 May 30, 2025
04ec637
fixup! fixup! Add JS example: collectd pipline
jarhodes314 May 30, 2025
d9b2ed5
fixup! fixup! Add JS example: collectd pipline
jarhodes314 May 30, 2025
2e797ab
Fix memory leak
didier-wenzek Jun 4, 2025
e827a12
Add benchmark options to tedge mqtt pub
didier-wenzek Jun 4, 2025
c19623f
Add gen-mapper system tests
didier-wenzek Jun 6, 2025
61270b4
Add a reference guide
didier-wenzek Jun 6, 2025
4edc775
Implement a circuit breaker filter
didier-wenzek Jun 20, 2025
96a056f
Add a filter to compute average over a time window
didier-wenzek Jun 20, 2025
44873ae
Extract message processing from gen-mapper actor
didier-wenzek Jun 25, 2025
819e44c
Add cli commands to test user-defined mappings
didier-wenzek Jun 26, 2025
a104b2b
Simplify gen-mapper subscription updates
didier-wenzek Jun 26, 2025
1cf271c
Gen mapper dynamically load/reload/remove pipelines and filters
didier-wenzek Jun 26, 2025
9024cef
Index pipelines using full paths not just filenames
didier-wenzek Jun 26, 2025
4117d4a
tedge mapping test consumes messages from stdin
didier-wenzek Jun 27, 2025
0fffac6
Test and fix te_2_c8y.js
didier-wenzek Jun 27, 2025
f5c21fd
Test dynamic update of filter config
didier-wenzek Jun 27, 2025
a75e447
Update system tests Setup
didier-wenzek Jun 27, 2025
3f3160e
Support for testing the tick function of a filter
didier-wenzek Jun 30, 2025
214ccc3
Add support for console.log()
didier-wenzek Jun 30, 2025
d798cb8
Tedge mapping test a single single filter
didier-wenzek Jul 1, 2025
094119f
Fix JS engine with one module per filter instance
didier-wenzek Jul 1, 2025
cae0eaa
use config_dir variable to get configuration file location
reubenmiller Jul 7, 2025
b184a82
Gen-mapper pipelines can drain data out MeaDB
didier-wenzek Jul 7, 2025
cba0b4b
Gen-mapper pipelines can persist data in MeaDB
didier-wenzek Jul 7, 2025
663d408
add initial mea database implementation to the generic mapper
jarhodes314 Jul 7, 2025
3f434ef
fixup! add initial mea database implementation to the generic mapper
didier-wenzek Jul 7, 2025
fdd578d
Add example: pipeline using mea-db
didier-wenzek Jul 7, 2025
3e8bbf4
Fix segfault
jarhodes314 Jul 8, 2025
a6dc1ec
Remove oldest message timestamp cache
jarhodes314 Jul 8, 2025
8b9ce32
Distinguish two message sources: MQTT vs DB
didier-wenzek Jul 8, 2025
3723b11
fixup! Distinguish two message sources: MQTT vs DB
didier-wenzek Jul 8, 2025
44968f7
Add db_dump bin to read and display db contents
Bravo555 Jul 8, 2025
4ec15fb
Fix cascading pipelines
didier-wenzek Jul 8, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,900 changes: 1,185 additions & 715 deletions Cargo.lock

Large diffs are not rendered by default.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ tedge_config_macros-impl = { path = "crates/common/tedge_config_macros/impl" }
tedge_config_manager = { path = "crates/extensions/tedge_config_manager" }
tedge_downloader_ext = { path = "crates/extensions/tedge_downloader_ext" }
tedge_file_system_ext = { path = "crates/extensions/tedge_file_system_ext" }
tedge_gen_mapper = { path = "crates/extensions/tedge_gen_mapper" }
tedge_health_ext = { path = "crates/extensions/tedge_health_ext" }
tedge_http_ext = { path = "crates/extensions/tedge_http_ext" }
tedge_log_manager = { path = "crates/extensions/tedge_log_manager" }
Expand Down Expand Up @@ -109,6 +110,7 @@ env_logger = "0.10"
fastrand = "2.0"
figment = { version = "0.10" }
filetime = "0.2"
fjall = "2.11"
flate2 = "1.1.1"
freedesktop_entry_parser = "1.3.0"
futures = "0.3"
Expand Down Expand Up @@ -163,6 +165,7 @@ regex = "1.4"
reqwest = { version = "0.12", default-features = false }
ron = "0.8"
rpassword = "5.0"
rquickjs = { version = "0.9", default-features = false }
rstest = "0.16.0"
rumqttc = { git = "https://github.com/jarhodes314/rumqtt", rev = "8c489faf6af910956c97b55587ff3ecb2ac4e96f" }
rumqttd = "0.19"
Expand Down
2 changes: 1 addition & 1 deletion ci/build_scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ BUILD_WITH="${BUILD_WITH:-zig}"
COMMON_BUILD_OPTIONS=(
"--release"
)
TOOLCHAIN="${TOOLCHAIN:-+1.78}"
TOOLCHAIN="${TOOLCHAIN:-+1.82}"
# Note: Minimum version that is supported with riscv64gc-unknown-linux-gnu is 2.27
GLIBC_VERSION="${GLIBC_VERSION:-2.17}"
RISCV_GLIBC_VERSION="${RISCV_GLIBC_VERSION:-2.27}"
Expand Down
9 changes: 7 additions & 2 deletions crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,10 +117,15 @@ impl TopicFilter {
}

/// Check if the given topic matches this filter pattern.
pub fn accept_topic(&self, topic: &Topic) -> bool {
pub fn accept_topic_name(&self, topic: &str) -> bool {
self.patterns
.iter()
.any(|pattern| rumqttc::matches(&topic.name, pattern))
.any(|pattern| rumqttc::matches(topic, pattern))
}

/// Check if the given topic matches this filter pattern.
pub fn accept_topic(&self, topic: &Topic) -> bool {
self.accept_topic_name(&topic.name)
}

/// Check if the given message matches this filter pattern.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/plugin_sm/src/operation_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod tests {
let unrelated_2 = create_file(log_dir.path(), "bar");

// Open the log dir
let _operation_logs = OperationLogs::try_new(log_dir.into_path().try_into().unwrap())?;
let _operation_logs = OperationLogs::try_new(log_dir.keep().try_into().unwrap())?;

// Outdated logs are removed
assert!(!update_log_1.exists());
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tedge-watchdog = { workspace = true }
tedge-write = { workspace = true }
tedge_api = { workspace = true }
tedge_config = { workspace = true }
tedge_gen_mapper = { workspace = true }
tedge_utils = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
Expand Down
126 changes: 126 additions & 0 deletions crates/core/tedge/src/cli/mapping/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
use crate::cli::mapping::list::ListCommand;
use crate::cli::mapping::test::TestCommand;
use crate::command::BuildCommand;
use crate::command::Command;
use crate::ConfigError;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use std::path::PathBuf;
use tedge_config::TEdgeConfig;
use tedge_gen_mapper::pipeline::Message;
use tedge_gen_mapper::MessageProcessor;

#[derive(clap::Subcommand, Debug)]
pub enum TEdgeMappingCli {
/// List pipelines and filters
List {
/// Path to pipeline and filter specs
///
/// Default to /etc/tedge/gen-mapper
#[clap(long)]
mapping_dir: Option<PathBuf>,

/// List pipelines processing messages published on this topic
///
/// If none is provided, lists all the pipelines
#[clap(long)]
topic: Option<String>,
},

/// Process message samples
Test {
/// Path to pipeline and filter specs
///
/// Default to /etc/tedge/gen-mapper
#[clap(long)]
mapping_dir: Option<PathBuf>,

/// Path to the javascript filter or TOML pipeline definition
///
/// If none is provided, applies all the matching pipelines
#[clap(long)]
filter: Option<PathBuf>,

/// Send a tick after all the message samples
#[clap(long = "final-tick")]
final_tick: bool,

/// Topic of the message sample
///
/// If none is provided, messages are read from stdin expecting a line per message:
/// [topic] payload
topic: Option<String>,

/// Payload of the message sample
///
/// If none is provided, payloads are read from stdin
payload: Option<String>,
},
}

impl BuildCommand for TEdgeMappingCli {
fn build_command(self, config: &TEdgeConfig) -> Result<Box<dyn Command>, ConfigError> {
match self {
TEdgeMappingCli::List { mapping_dir, topic } => {
let mapping_dir = mapping_dir.unwrap_or_else(|| Self::default_mapping_dir(config));
Ok(ListCommand { mapping_dir, topic }.into_boxed())
}

TEdgeMappingCli::Test {
mapping_dir,
filter,
final_tick,
topic,
payload,
} => {
let mapping_dir = mapping_dir.unwrap_or_else(|| Self::default_mapping_dir(config));
let message = match (topic, payload) {
(Some(topic), Some(payload)) => Some(Message { topic, payload }),
(Some(_), None) => Err(anyhow!("Missing sample payload"))?,
(None, Some(_)) => Err(anyhow!("Missing sample topic"))?,
(None, None) => None,
};
Ok(TestCommand {
mapping_dir,
filter,
message,
final_tick,
}
.into_boxed())
}
}
}
}

impl TEdgeMappingCli {
fn default_mapping_dir(config: &TEdgeConfig) -> PathBuf {
config.root_dir().join("gen-mapper").into()
}

pub async fn load_pipelines(mapping_dir: &PathBuf) -> Result<MessageProcessor, Error> {
MessageProcessor::try_new(mapping_dir)
.await
.with_context(|| {
format!(
"loading pipelines and filters from {}",
mapping_dir.display()
)
})
}

pub async fn load_filter(
mapping_dir: &PathBuf,
path: &PathBuf,
) -> Result<MessageProcessor, Error> {
if let Some("toml") = path.extension().and_then(|s| s.to_str()) {
MessageProcessor::try_new_single_pipeline(mapping_dir, path)
.await
.with_context(|| format!("loading pipeline {pipeline}", pipeline = path.display()))
} else {
MessageProcessor::try_new_single_filter(mapping_dir, path)
.await
.with_context(|| format!("loading filter {filter}", filter = path.display()))
}
}
}
47 changes: 47 additions & 0 deletions crates/core/tedge/src/cli/mapping/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::cli::mapping::TEdgeMappingCli;
use crate::command::Command;
use crate::log::MaybeFancy;
use anyhow::Error;
use std::path::PathBuf;
use tedge_config::TEdgeConfig;
use tedge_gen_mapper::pipeline::Pipeline;

pub struct ListCommand {
pub mapping_dir: PathBuf,
pub topic: Option<String>,
}

#[async_trait::async_trait]
impl Command for ListCommand {
fn description(&self) -> String {
format!(
"list pipelines and filters in {:}",
self.mapping_dir.display()
)
}

async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
let processor = TEdgeMappingCli::load_pipelines(&self.mapping_dir).await?;

match &self.topic {
Some(topic) => processor
.pipelines
.iter()
.filter(|(_, pipeline)| pipeline.topics().accept_topic_name(topic))
.for_each(Self::display),

None => processor.pipelines.iter().for_each(Self::display),
}

Ok(())
}
}

impl ListCommand {
fn display((pipeline_id, pipeline): (&String, &Pipeline)) {
println!("{pipeline_id}");
for stage in pipeline.stages.iter() {
println!("\t{}", stage.filter.path.display());
}
}
}
5 changes: 5 additions & 0 deletions crates/core/tedge/src/cli/mapping/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod cli;
mod list;
mod test;

pub use cli::TEdgeMappingCli;
135 changes: 135 additions & 0 deletions crates/core/tedge/src/cli/mapping/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::cli::mapping::TEdgeMappingCli;
use crate::command::Command;
use crate::log::MaybeFancy;
use anyhow::Error;
use std::path::PathBuf;
use tedge_config::TEdgeConfig;
use tedge_gen_mapper::pipeline::*;
use tedge_gen_mapper::MessageProcessor;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Stdin;

pub struct TestCommand {
pub mapping_dir: PathBuf,
pub filter: Option<PathBuf>,
pub message: Option<Message>,
pub final_tick: bool,
}

#[async_trait::async_trait]
impl Command for TestCommand {
fn description(&self) -> String {
format!(
"process message samples using pipelines and filters in {:}",
self.mapping_dir.display()
)
}

async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
let mut processor = match &self.filter {
None => TEdgeMappingCli::load_pipelines(&self.mapping_dir).await?,
Some(filter) => TEdgeMappingCli::load_filter(&self.mapping_dir, filter).await?,
};
if let Some(message) = &self.message {
let timestamp = DateTime::now();
self.process(&mut processor, message, &timestamp).await;
} else {
let mut stdin = BufReader::new(tokio::io::stdin());
while let Some(message) = next_message(&mut stdin).await {
let timestamp = DateTime::now();
self.process(&mut processor, &message, &timestamp).await;
}
}
if self.final_tick {
let timestamp = DateTime::now();
self.tick(&mut processor, &timestamp).await;
}
Ok(())
}
}

impl TestCommand {
async fn process(
&self,
processor: &mut MessageProcessor,
message: &Message,
timestamp: &DateTime,
) {
processor
.process(MessageSource::MQTT, timestamp, message)
.await
.into_iter()
.map(|(_, v)| v)
.for_each(print)
}

async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) {
processor
.tick(timestamp)
.await
.into_iter()
.map(|(_, v)| v)
.for_each(print)
}
}

fn print(messages: Result<Vec<Message>, FilterError>) {
match messages {
Ok(messages) => {
for message in messages {
println!("[{}] {}", message.topic, message.payload);
}
}
Err(err) => {
eprintln!("Error: {}", err)
}
}
}

fn parse(line: String) -> Result<Option<Message>, Error> {
let line = line.trim();
if line.is_empty() {
return Ok(None);
}
if !line.starts_with("[") {
return Err(anyhow::anyhow!("Missing opening bracket: {}", line));
}
let Some(closing_bracket) = line.find(']') else {
return Err(anyhow::anyhow!("Missing closing bracket: {}", line));
};

let topic = line[1..closing_bracket].to_string();
let payload = line[closing_bracket + 1..].to_string();

Ok(Some(Message { topic, payload }))
}

async fn next_line(input: &mut BufReader<Stdin>) -> Option<String> {
loop {
let mut line = String::new();
match input.read_line(&mut line).await {
Ok(0) => return None,
Ok(_) => {
let line = line.trim();
if !line.is_empty() {
return Some(line.to_string());
}
}
Err(err) => {
eprintln!("Fail to read input stream {}", err);
return None;
}
}
}
}
async fn next_message(input: &mut BufReader<Stdin>) -> Option<Message> {
let line = next_line(input).await?;
match parse(line) {
Ok(message) => message,
Err(err) => {
eprintln!("Fail to parse input message {}", err);
None
}
}
}
Loading
Loading