Skip to content

poc: Extensible Mapper using JavaScript #3650

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 53 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
2d5770f
Add a new mapper: tedge-gen-mapper
didier-wenzek Mar 24, 2025
cf18156
Sketch message transformation pipelines
didier-wenzek Mar 25, 2025
469b0f3
Run JS transformation stages
didier-wenzek May 14, 2025
a63206f
Update deprecated call tempfile into_path -> keep
didier-wenzek May 16, 2025
f092635
Avoid to deserialize MQTT at each pipeline stage.
didier-wenzek May 19, 2025
5871c21
Add JS example: drop stragglers
didier-wenzek May 19, 2025
88ad9c4
Add ability to configure each stages
didier-wenzek May 19, 2025
cb656fe
Add JS example: collectd pipeline
didier-wenzek May 20, 2025
a3a140f
Add JS example: thin-edge JSON to c8y JSON
didier-wenzek May 20, 2025
5d0ffd1
Fix tests broken by cargo update
didier-wenzek May 20, 2025
9a84b39
Update JS filter config using MQTT
didier-wenzek May 20, 2025
7f8f966
Filters can defer messages up to the end of a time window
didier-wenzek May 20, 2025
efde5fe
Tedge-gen-mapper dynamically reloads pipelines and filters
jarhodes314 May 21, 2025
185076d
Tedge-gen-mapper dynamically subscribes to MQTT topics
didier-wenzek May 26, 2025
50038c9
Test JS example: collectd pipeline
didier-wenzek May 28, 2025
56480dd
Tidy JS example
jarhodes314 May 30, 2025
779ff7f
Relay error messages from Javascript to the generic mapper logs
jarhodes314 May 30, 2025
1c1e9e2
fixup! fixup! Add JS example: collectd pipline
jarhodes314 May 30, 2025
f96d74b
fixup! fixup! Add JS example: collectd pipline
jarhodes314 May 30, 2025
0b2ad88
Fix memory leak
didier-wenzek Jun 4, 2025
b5faa99
Add benchmark options to tedge mqtt pub
didier-wenzek Jun 4, 2025
b97ab44
Add gen-mapper system tests
didier-wenzek Jun 6, 2025
7673515
Add a reference guide
didier-wenzek Jun 6, 2025
2a40620
Implement a circuit breaker filter
didier-wenzek Jun 20, 2025
f797861
Add a filter to compute average over a time window
didier-wenzek Jun 20, 2025
ed33143
Extract message processing from gen-mapper actor
didier-wenzek Jun 25, 2025
b2233a4
Add cli commands to test user-defined mappings
didier-wenzek Jun 26, 2025
8ccf516
Simplify gen-mapper subscription updates
didier-wenzek Jun 26, 2025
883efe8
Gen mapper dynamically load/reload/remove pipelines and filters
didier-wenzek Jun 26, 2025
61ef1e3
Index pipelines using full paths not just filenames
didier-wenzek Jun 26, 2025
a7108e3
tedge mapping test consumes messages from stdin
didier-wenzek Jun 27, 2025
db0f9a7
Test and fix te_2_c8y.js
didier-wenzek Jun 27, 2025
f36568c
Test dynamic update of filter config
didier-wenzek Jun 27, 2025
203f21e
Update system tests Setup
didier-wenzek Jun 27, 2025
024fe6f
Support for testing the tick function of a filter
didier-wenzek Jun 30, 2025
59a3279
Add support for console.log()
didier-wenzek Jun 30, 2025
5850d6e
Tedge mapping test a single single filter
didier-wenzek Jul 1, 2025
5b137e8
Fix JS engine with one module per filter instance
didier-wenzek Jul 1, 2025
c26ed7b
use config_dir variable to get configuration file location
reubenmiller Jul 7, 2025
ebd8af2
The default config of flow transformation is {}
didier-wenzek Jul 10, 2025
7c2049f
Warn when javascript module and config are not consistent
didier-wenzek Jul 10, 2025
d3df515
Trigger ticks every seconds per default when a tick handler is provided
didier-wenzek Jul 10, 2025
d23100e
fixup! Warn when javascript module and config are not consistent
didier-wenzek Jul 11, 2025
c111bcc
collect message transformation stats
didier-wenzek Jul 11, 2025
34a5fe2
Naming fix: Pipeline -> Flow
didier-wenzek Jul 15, 2025
ec21ead
Naming fix: Stage -> Step
didier-wenzek Jul 15, 2025
208986a
Naming fix: Filter -> Flow Step
didier-wenzek Jul 15, 2025
3e83ba4
fixup! Add a new mapper: tedge-gen-mapper
didier-wenzek Jul 16, 2025
abe12fe
Concept documentation
didier-wenzek Jul 16, 2025
eccec91
zsh:1: command not found: q
didier-wenzek Jul 17, 2025
6641cfc
Set memory usage limits to flow scripts
didier-wenzek Jul 17, 2025
8ff31bf
Flow API: Use more idiomatic Javascript names
didier-wenzek Jul 18, 2025
a45942d
Pass processing time along the message
didier-wenzek Jul 18, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1,709 changes: 996 additions & 713 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ tedge_config_macros-impl = { path = "crates/common/tedge_config_macros/impl" }
tedge_config_manager = { path = "crates/extensions/tedge_config_manager" }
tedge_downloader_ext = { path = "crates/extensions/tedge_downloader_ext" }
tedge_file_system_ext = { path = "crates/extensions/tedge_file_system_ext" }
tedge_gen_mapper = { path = "crates/extensions/tedge_gen_mapper" }
tedge_health_ext = { path = "crates/extensions/tedge_health_ext" }
tedge_http_ext = { path = "crates/extensions/tedge_http_ext" }
tedge_log_manager = { path = "crates/extensions/tedge_log_manager" }
Expand Down Expand Up @@ -164,6 +165,7 @@ regex = "1.4"
reqwest = { version = "0.12", default-features = false }
ron = "0.8"
rpassword = "5.0"
rquickjs = { version = "0.9", default-features = false }
rstest = "0.16.0"
rumqttc = { git = "https://github.com/jarhodes314/rumqtt", rev = "8c489faf6af910956c97b55587ff3ecb2ac4e96f" }
rumqttd = "0.19"
Expand Down
2 changes: 1 addition & 1 deletion ci/build_scripts/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ BUILD_WITH="${BUILD_WITH:-zig}"
COMMON_BUILD_OPTIONS=(
"--release"
)
TOOLCHAIN="${TOOLCHAIN:-+1.78}"
TOOLCHAIN="${TOOLCHAIN:-+1.82}"
# Note: Minimum version that is supported with riscv64gc-unknown-linux-gnu is 2.27
GLIBC_VERSION="${GLIBC_VERSION:-2.17}"
RISCV_GLIBC_VERSION="${RISCV_GLIBC_VERSION:-2.27}"
Expand Down
13 changes: 11 additions & 2 deletions crates/common/mqtt_channel/src/topics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,20 @@ impl TopicFilter {
}
}

pub fn is_empty(&self) -> bool {
self.patterns.is_empty()
}

/// Check if the given topic matches this filter pattern.
pub fn accept_topic(&self, topic: &Topic) -> bool {
pub fn accept_topic_name(&self, topic: &str) -> bool {
self.patterns
.iter()
.any(|pattern| rumqttc::matches(&topic.name, pattern))
.any(|pattern| rumqttc::matches(topic, pattern))
}

/// Check if the given topic matches this filter pattern.
pub fn accept_topic(&self, topic: &Topic) -> bool {
self.accept_topic_name(&topic.name)
}

/// Check if the given message matches this filter pattern.
Expand Down
2 changes: 1 addition & 1 deletion crates/core/plugin_sm/src/operation_logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ mod tests {
let unrelated_2 = create_file(log_dir.path(), "bar");

// Open the log dir
let _operation_logs = OperationLogs::try_new(log_dir.into_path().try_into().unwrap())?;
let _operation_logs = OperationLogs::try_new(log_dir.keep().try_into().unwrap())?;

// Outdated logs are removed
assert!(!update_log_1.exists());
Expand Down
1 change: 1 addition & 0 deletions crates/core/tedge/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ tedge-watchdog = { workspace = true }
tedge-write = { workspace = true }
tedge_api = { workspace = true }
tedge_config = { workspace = true }
tedge_gen_mapper = { workspace = true }
tedge_utils = { workspace = true }
thiserror = { workspace = true }
time = { workspace = true }
Expand Down
121 changes: 121 additions & 0 deletions crates/core/tedge/src/cli/mapping/cli.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
use crate::cli::mapping::list::ListCommand;
use crate::cli::mapping::test::TestCommand;
use crate::command::BuildCommand;
use crate::command::Command;
use crate::ConfigError;
use anyhow::anyhow;
use anyhow::Context;
use anyhow::Error;
use std::path::PathBuf;
use tedge_config::TEdgeConfig;
use tedge_gen_mapper::flow::Message;
use tedge_gen_mapper::MessageProcessor;

#[derive(clap::Subcommand, Debug)]
pub enum TEdgeMappingCli {
/// List flows and steps
List {
/// Path to the directory of flows and steps
///
/// Default to /etc/tedge/gen-mapper
#[clap(long)]
mapping_dir: Option<PathBuf>,

/// List flows processing messages published on this topic
///
/// If none is provided, lists all the flows
#[clap(long)]
topic: Option<String>,
},

/// Process message samples
Test {
/// Path to the directory of flows and steps
///
/// Default to /etc/tedge/gen-mapper
#[clap(long)]
mapping_dir: Option<PathBuf>,

/// Path to the flow step script or TOML flow definition
///
/// If none is provided, applies all the matching flows
#[clap(long)]
flow: Option<PathBuf>,

/// Send a tick after all the message samples
#[clap(long = "final-tick")]
final_tick: bool,

/// Topic of the message sample
///
/// If none is provided, messages are read from stdin expecting a line per message:
/// [topic] payload
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This nomenclature can be slightly confusing as on the command line, [xxx] generally means "optional", where as the topic is mandatory.

Though I assume [] was chosen because that is how we format the messages on the console from tedge mqtt sub. So given that, maybe we can just provide an example in addition to the syntax,

e.g.

/// [topic] payload
/// For example,
/// [te/example/one] {"text":"test message"}

Copy link
Contributor Author

@didier-wenzek didier-wenzek Jul 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the bracketed topic syntax has been chosen to be able to pipe tedge mqtt sub into tedge mapping test.

However, if the topic is provided on the command line then no brackets are expected:

$ tedge mapping test te/example/one '{"text":"test message"}'

topic: Option<String>,

/// Payload of the message sample
///
/// If none is provided, payloads are read from stdin
payload: Option<String>,
},
}

impl BuildCommand for TEdgeMappingCli {
fn build_command(self, config: &TEdgeConfig) -> Result<Box<dyn Command>, ConfigError> {
match self {
TEdgeMappingCli::List { mapping_dir, topic } => {
let mapping_dir = mapping_dir.unwrap_or_else(|| Self::default_mapping_dir(config));
Ok(ListCommand { mapping_dir, topic }.into_boxed())
}

TEdgeMappingCli::Test {
mapping_dir,
flow,
final_tick,
topic,
payload,
} => {
let mapping_dir = mapping_dir.unwrap_or_else(|| Self::default_mapping_dir(config));
let message = match (topic, payload) {
(Some(topic), Some(payload)) => Some(Message { topic, payload }),
(Some(_), None) => Err(anyhow!("Missing sample payload"))?,
(None, Some(_)) => Err(anyhow!("Missing sample topic"))?,
(None, None) => None,
};
Ok(TestCommand {
mapping_dir,
flow,
message,
final_tick,
}
.into_boxed())
}
}
}
}

impl TEdgeMappingCli {
fn default_mapping_dir(config: &TEdgeConfig) -> PathBuf {
config.root_dir().join("gen-mapper").into()
}

pub async fn load_flows(mapping_dir: &PathBuf) -> Result<MessageProcessor, Error> {
MessageProcessor::try_new(mapping_dir)
.await
.with_context(|| format!("loading flows and steps from {}", mapping_dir.display()))
}

pub async fn load_file(
mapping_dir: &PathBuf,
path: &PathBuf,
) -> Result<MessageProcessor, Error> {
if let Some("toml") = path.extension().and_then(|s| s.to_str()) {
MessageProcessor::try_new_single_flow(mapping_dir, path)
.await
.with_context(|| format!("loading flow {flow}", flow = path.display()))
} else {
MessageProcessor::try_new_single_step_flow(mapping_dir, path)
.await
.with_context(|| format!("loading flow script {script}", script = path.display()))
}
}
}
47 changes: 47 additions & 0 deletions crates/core/tedge/src/cli/mapping/list.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
use crate::cli::mapping::TEdgeMappingCli;
use crate::command::Command;
use crate::log::MaybeFancy;
use anyhow::Error;
use std::path::PathBuf;
use tedge_config::TEdgeConfig;
use tedge_gen_mapper::flow::Flow;

pub struct ListCommand {
pub mapping_dir: PathBuf,
pub topic: Option<String>,
}

#[async_trait::async_trait]
impl Command for ListCommand {
fn description(&self) -> String {
format!(
"list flows and flow steps in {:}",
self.mapping_dir.display()
)
}

async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
let processor = TEdgeMappingCli::load_flows(&self.mapping_dir).await?;

match &self.topic {
Some(topic) => processor
.flows
.iter()
.filter(|(_, flow)| flow.topics().accept_topic_name(topic))
.for_each(Self::display),

None => processor.flows.iter().for_each(Self::display),
}

Ok(())
}
}

impl ListCommand {
fn display((flow_id, flow): (&String, &Flow)) {
println!("{flow_id}");
for step in flow.steps.iter() {
println!("\t{}", step.script.path.display());
}
}
}
5 changes: 5 additions & 0 deletions crates/core/tedge/src/cli/mapping/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
mod cli;
mod list;
mod test;

pub use cli::TEdgeMappingCli;
135 changes: 135 additions & 0 deletions crates/core/tedge/src/cli/mapping/test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
use crate::cli::mapping::TEdgeMappingCli;
use crate::command::Command;
use crate::log::MaybeFancy;
use anyhow::Error;
use std::path::PathBuf;
use tedge_config::TEdgeConfig;
use tedge_gen_mapper::flow::*;
use tedge_gen_mapper::MessageProcessor;
use tokio::io::AsyncBufReadExt;
use tokio::io::BufReader;
use tokio::io::Stdin;

pub struct TestCommand {
pub mapping_dir: PathBuf,
pub flow: Option<PathBuf>,
pub message: Option<Message>,
pub final_tick: bool,
}

#[async_trait::async_trait]
impl Command for TestCommand {
fn description(&self) -> String {
format!(
"process message samples using flows and steps in {:}",
self.mapping_dir.display()
)
}

async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
let mut processor = match &self.flow {
None => TEdgeMappingCli::load_flows(&self.mapping_dir).await?,
Some(flow) => TEdgeMappingCli::load_file(&self.mapping_dir, flow).await?,
};
if let Some(message) = &self.message {
let timestamp = DateTime::now();
self.process(&mut processor, message, &timestamp).await;
} else {
let mut stdin = BufReader::new(tokio::io::stdin());
while let Some(message) = next_message(&mut stdin).await {
let timestamp = DateTime::now();
self.process(&mut processor, &message, &timestamp).await;
}
}
if self.final_tick {
let timestamp = DateTime::now();
self.tick(&mut processor, &timestamp).await;
}
Ok(())
}
}

impl TestCommand {
async fn process(
&self,
processor: &mut MessageProcessor,
message: &Message,
timestamp: &DateTime,
) {
processor
.process(timestamp, message)
.await
.into_iter()
.map(|(_, v)| v)
.for_each(print)
}

async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) {
processor
.tick(timestamp)
.await
.into_iter()
.map(|(_, v)| v)
.for_each(print)
}
}

fn print(messages: Result<Vec<Message>, FlowError>) {
match messages {
Ok(messages) => {
for message in messages {
println!("[{}] {}", message.topic, message.payload);
}
}
Err(err) => {
eprintln!("Error: {}", err)
}
}
}

fn parse(line: String) -> Result<Option<Message>, Error> {
let line = line.trim();
if line.is_empty() {
return Ok(None);
}
if !line.starts_with("[") {
return Err(anyhow::anyhow!("Missing opening bracket: {}", line));
}
let Some(closing_bracket) = line.find(']') else {
return Err(anyhow::anyhow!("Missing closing bracket: {}", line));
};

let topic = line[1..closing_bracket].to_string();
let payload = line[closing_bracket + 1..].to_string();

Ok(Some(Message { topic, payload }))
}

async fn next_line(input: &mut BufReader<Stdin>) -> Option<String> {
loop {
let mut line = String::new();
match input.read_line(&mut line).await {
Ok(0) => return None,
Ok(_) => {
let line = line.trim();
if !line.is_empty() {
return Some(line.to_string());
}
}
Err(err) => {
eprintln!("Fail to read input stream {}", err);
return None;
}
}
}
}
async fn next_message(input: &mut BufReader<Stdin>) -> Option<Message> {
let line = next_line(input).await?;
match parse(line) {
Ok(message) => message,
Err(err) => {
eprintln!("Fail to parse input message {}", err);
None
}
}
}
Loading
Loading