-
Notifications
You must be signed in to change notification settings - Fork 65
poc: Extensible Mapper using JavaScript #3650
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
didier-wenzek
wants to merge
53
commits into
thin-edge:main
Choose a base branch
from
didier-wenzek:poc/tedge-quickjs-mapper
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
53 commits
Select commit
Hold shift + click to select a range
2d5770f
Add a new mapper: tedge-gen-mapper
didier-wenzek cf18156
Sketch message transformation pipelines
didier-wenzek 469b0f3
Run JS transformation stages
didier-wenzek a63206f
Update deprecated call tempfile into_path -> keep
didier-wenzek f092635
Avoid to deserialize MQTT at each pipeline stage.
didier-wenzek 5871c21
Add JS example: drop stragglers
didier-wenzek 88ad9c4
Add ability to configure each stages
didier-wenzek cb656fe
Add JS example: collectd pipeline
didier-wenzek a3a140f
Add JS example: thin-edge JSON to c8y JSON
didier-wenzek 5d0ffd1
Fix tests broken by cargo update
didier-wenzek 9a84b39
Update JS filter config using MQTT
didier-wenzek 7f8f966
Filters can defer messages up to the end of a time window
didier-wenzek efde5fe
Tedge-gen-mapper dynamically reloads pipelines and filters
jarhodes314 185076d
Tedge-gen-mapper dynamically subscribes to MQTT topics
didier-wenzek 50038c9
Test JS example: collectd pipeline
didier-wenzek 56480dd
Tidy JS example
jarhodes314 779ff7f
Relay error messages from Javascript to the generic mapper logs
jarhodes314 1c1e9e2
fixup! fixup! Add JS example: collectd pipline
jarhodes314 f96d74b
fixup! fixup! Add JS example: collectd pipline
jarhodes314 0b2ad88
Fix memory leak
didier-wenzek b5faa99
Add benchmark options to tedge mqtt pub
didier-wenzek b97ab44
Add gen-mapper system tests
didier-wenzek 7673515
Add a reference guide
didier-wenzek 2a40620
Implement a circuit breaker filter
didier-wenzek f797861
Add a filter to compute average over a time window
didier-wenzek ed33143
Extract message processing from gen-mapper actor
didier-wenzek b2233a4
Add cli commands to test user-defined mappings
didier-wenzek 8ccf516
Simplify gen-mapper subscription updates
didier-wenzek 883efe8
Gen mapper dynamically load/reload/remove pipelines and filters
didier-wenzek 61ef1e3
Index pipelines using full paths not just filenames
didier-wenzek a7108e3
tedge mapping test consumes messages from stdin
didier-wenzek db0f9a7
Test and fix te_2_c8y.js
didier-wenzek f36568c
Test dynamic update of filter config
didier-wenzek 203f21e
Update system tests Setup
didier-wenzek 024fe6f
Support for testing the tick function of a filter
didier-wenzek 59a3279
Add support for console.log()
didier-wenzek 5850d6e
Tedge mapping test a single single filter
didier-wenzek 5b137e8
Fix JS engine with one module per filter instance
didier-wenzek c26ed7b
use config_dir variable to get configuration file location
reubenmiller ebd8af2
The default config of flow transformation is {}
didier-wenzek 7c2049f
Warn when javascript module and config are not consistent
didier-wenzek d3df515
Trigger ticks every seconds per default when a tick handler is provided
didier-wenzek d23100e
fixup! Warn when javascript module and config are not consistent
didier-wenzek c111bcc
collect message transformation stats
didier-wenzek 34a5fe2
Naming fix: Pipeline -> Flow
didier-wenzek ec21ead
Naming fix: Stage -> Step
didier-wenzek 208986a
Naming fix: Filter -> Flow Step
didier-wenzek 3e83ba4
fixup! Add a new mapper: tedge-gen-mapper
didier-wenzek abe12fe
Concept documentation
didier-wenzek eccec91
zsh:1: command not found: q
didier-wenzek 6641cfc
Set memory usage limits to flow scripts
didier-wenzek 8ff31bf
Flow API: Use more idiomatic Javascript names
didier-wenzek a45942d
Pass processing time along the message
didier-wenzek File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
use crate::cli::mapping::list::ListCommand; | ||
use crate::cli::mapping::test::TestCommand; | ||
use crate::command::BuildCommand; | ||
use crate::command::Command; | ||
use crate::ConfigError; | ||
use anyhow::anyhow; | ||
use anyhow::Context; | ||
use anyhow::Error; | ||
use std::path::PathBuf; | ||
use tedge_config::TEdgeConfig; | ||
use tedge_gen_mapper::flow::Message; | ||
use tedge_gen_mapper::MessageProcessor; | ||
|
||
#[derive(clap::Subcommand, Debug)] | ||
pub enum TEdgeMappingCli { | ||
/// List flows and steps | ||
List { | ||
/// Path to the directory of flows and steps | ||
/// | ||
/// Default to /etc/tedge/gen-mapper | ||
#[clap(long)] | ||
mapping_dir: Option<PathBuf>, | ||
|
||
/// List flows processing messages published on this topic | ||
/// | ||
/// If none is provided, lists all the flows | ||
#[clap(long)] | ||
topic: Option<String>, | ||
}, | ||
|
||
/// Process message samples | ||
Test { | ||
/// Path to the directory of flows and steps | ||
/// | ||
/// Default to /etc/tedge/gen-mapper | ||
#[clap(long)] | ||
mapping_dir: Option<PathBuf>, | ||
|
||
/// Path to the flow step script or TOML flow definition | ||
/// | ||
/// If none is provided, applies all the matching flows | ||
#[clap(long)] | ||
flow: Option<PathBuf>, | ||
|
||
/// Send a tick after all the message samples | ||
#[clap(long = "final-tick")] | ||
final_tick: bool, | ||
|
||
/// Topic of the message sample | ||
/// | ||
/// If none is provided, messages are read from stdin expecting a line per message: | ||
/// [topic] payload | ||
topic: Option<String>, | ||
|
||
/// Payload of the message sample | ||
/// | ||
/// If none is provided, payloads are read from stdin | ||
payload: Option<String>, | ||
}, | ||
} | ||
|
||
impl BuildCommand for TEdgeMappingCli { | ||
fn build_command(self, config: &TEdgeConfig) -> Result<Box<dyn Command>, ConfigError> { | ||
match self { | ||
TEdgeMappingCli::List { mapping_dir, topic } => { | ||
let mapping_dir = mapping_dir.unwrap_or_else(|| Self::default_mapping_dir(config)); | ||
Ok(ListCommand { mapping_dir, topic }.into_boxed()) | ||
} | ||
|
||
TEdgeMappingCli::Test { | ||
mapping_dir, | ||
flow, | ||
final_tick, | ||
topic, | ||
payload, | ||
} => { | ||
let mapping_dir = mapping_dir.unwrap_or_else(|| Self::default_mapping_dir(config)); | ||
let message = match (topic, payload) { | ||
(Some(topic), Some(payload)) => Some(Message { topic, payload }), | ||
(Some(_), None) => Err(anyhow!("Missing sample payload"))?, | ||
(None, Some(_)) => Err(anyhow!("Missing sample topic"))?, | ||
(None, None) => None, | ||
}; | ||
Ok(TestCommand { | ||
mapping_dir, | ||
flow, | ||
message, | ||
final_tick, | ||
} | ||
.into_boxed()) | ||
} | ||
} | ||
} | ||
} | ||
|
||
impl TEdgeMappingCli { | ||
fn default_mapping_dir(config: &TEdgeConfig) -> PathBuf { | ||
config.root_dir().join("gen-mapper").into() | ||
} | ||
|
||
pub async fn load_flows(mapping_dir: &PathBuf) -> Result<MessageProcessor, Error> { | ||
MessageProcessor::try_new(mapping_dir) | ||
.await | ||
.with_context(|| format!("loading flows and steps from {}", mapping_dir.display())) | ||
} | ||
|
||
pub async fn load_file( | ||
mapping_dir: &PathBuf, | ||
path: &PathBuf, | ||
) -> Result<MessageProcessor, Error> { | ||
if let Some("toml") = path.extension().and_then(|s| s.to_str()) { | ||
MessageProcessor::try_new_single_flow(mapping_dir, path) | ||
.await | ||
.with_context(|| format!("loading flow {flow}", flow = path.display())) | ||
} else { | ||
MessageProcessor::try_new_single_step_flow(mapping_dir, path) | ||
.await | ||
.with_context(|| format!("loading flow script {script}", script = path.display())) | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,47 @@ | ||
use crate::cli::mapping::TEdgeMappingCli; | ||
use crate::command::Command; | ||
use crate::log::MaybeFancy; | ||
use anyhow::Error; | ||
use std::path::PathBuf; | ||
use tedge_config::TEdgeConfig; | ||
use tedge_gen_mapper::flow::Flow; | ||
|
||
pub struct ListCommand { | ||
pub mapping_dir: PathBuf, | ||
pub topic: Option<String>, | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Command for ListCommand { | ||
fn description(&self) -> String { | ||
format!( | ||
"list flows and flow steps in {:}", | ||
self.mapping_dir.display() | ||
) | ||
} | ||
|
||
async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> { | ||
let processor = TEdgeMappingCli::load_flows(&self.mapping_dir).await?; | ||
|
||
match &self.topic { | ||
Some(topic) => processor | ||
.flows | ||
.iter() | ||
.filter(|(_, flow)| flow.topics().accept_topic_name(topic)) | ||
.for_each(Self::display), | ||
|
||
None => processor.flows.iter().for_each(Self::display), | ||
} | ||
|
||
Ok(()) | ||
} | ||
} | ||
|
||
impl ListCommand { | ||
fn display((flow_id, flow): (&String, &Flow)) { | ||
println!("{flow_id}"); | ||
for step in flow.steps.iter() { | ||
println!("\t{}", step.script.path.display()); | ||
} | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,5 @@ | ||
mod cli; | ||
mod list; | ||
mod test; | ||
|
||
pub use cli::TEdgeMappingCli; |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
use crate::cli::mapping::TEdgeMappingCli; | ||
use crate::command::Command; | ||
use crate::log::MaybeFancy; | ||
use anyhow::Error; | ||
use std::path::PathBuf; | ||
use tedge_config::TEdgeConfig; | ||
use tedge_gen_mapper::flow::*; | ||
use tedge_gen_mapper::MessageProcessor; | ||
use tokio::io::AsyncBufReadExt; | ||
use tokio::io::BufReader; | ||
use tokio::io::Stdin; | ||
|
||
pub struct TestCommand { | ||
pub mapping_dir: PathBuf, | ||
pub flow: Option<PathBuf>, | ||
pub message: Option<Message>, | ||
pub final_tick: bool, | ||
} | ||
|
||
#[async_trait::async_trait] | ||
impl Command for TestCommand { | ||
fn description(&self) -> String { | ||
format!( | ||
"process message samples using flows and steps in {:}", | ||
self.mapping_dir.display() | ||
) | ||
} | ||
|
||
async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> { | ||
let mut processor = match &self.flow { | ||
None => TEdgeMappingCli::load_flows(&self.mapping_dir).await?, | ||
Some(flow) => TEdgeMappingCli::load_file(&self.mapping_dir, flow).await?, | ||
}; | ||
if let Some(message) = &self.message { | ||
let timestamp = DateTime::now(); | ||
self.process(&mut processor, message, ×tamp).await; | ||
} else { | ||
let mut stdin = BufReader::new(tokio::io::stdin()); | ||
while let Some(message) = next_message(&mut stdin).await { | ||
let timestamp = DateTime::now(); | ||
self.process(&mut processor, &message, ×tamp).await; | ||
} | ||
} | ||
if self.final_tick { | ||
let timestamp = DateTime::now(); | ||
self.tick(&mut processor, ×tamp).await; | ||
} | ||
Ok(()) | ||
} | ||
} | ||
|
||
impl TestCommand { | ||
async fn process( | ||
&self, | ||
processor: &mut MessageProcessor, | ||
message: &Message, | ||
timestamp: &DateTime, | ||
) { | ||
processor | ||
.process(timestamp, message) | ||
.await | ||
.into_iter() | ||
.map(|(_, v)| v) | ||
.for_each(print) | ||
} | ||
|
||
async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) { | ||
processor | ||
.tick(timestamp) | ||
.await | ||
.into_iter() | ||
.map(|(_, v)| v) | ||
.for_each(print) | ||
} | ||
} | ||
|
||
fn print(messages: Result<Vec<Message>, FlowError>) { | ||
match messages { | ||
Ok(messages) => { | ||
for message in messages { | ||
println!("[{}] {}", message.topic, message.payload); | ||
} | ||
} | ||
Err(err) => { | ||
eprintln!("Error: {}", err) | ||
} | ||
} | ||
} | ||
|
||
fn parse(line: String) -> Result<Option<Message>, Error> { | ||
let line = line.trim(); | ||
if line.is_empty() { | ||
return Ok(None); | ||
} | ||
if !line.starts_with("[") { | ||
return Err(anyhow::anyhow!("Missing opening bracket: {}", line)); | ||
} | ||
let Some(closing_bracket) = line.find(']') else { | ||
return Err(anyhow::anyhow!("Missing closing bracket: {}", line)); | ||
}; | ||
|
||
let topic = line[1..closing_bracket].to_string(); | ||
let payload = line[closing_bracket + 1..].to_string(); | ||
|
||
Ok(Some(Message { topic, payload })) | ||
} | ||
|
||
async fn next_line(input: &mut BufReader<Stdin>) -> Option<String> { | ||
loop { | ||
let mut line = String::new(); | ||
match input.read_line(&mut line).await { | ||
Ok(0) => return None, | ||
Ok(_) => { | ||
let line = line.trim(); | ||
if !line.is_empty() { | ||
return Some(line.to_string()); | ||
} | ||
} | ||
Err(err) => { | ||
eprintln!("Fail to read input stream {}", err); | ||
return None; | ||
} | ||
} | ||
} | ||
} | ||
async fn next_message(input: &mut BufReader<Stdin>) -> Option<Message> { | ||
let line = next_line(input).await?; | ||
match parse(line) { | ||
Ok(message) => message, | ||
Err(err) => { | ||
eprintln!("Fail to parse input message {}", err); | ||
None | ||
} | ||
} | ||
} |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This nomenclature can be slightly confusing as on the command line,
[xxx]
generally means "optional", where as the topic is mandatory.Though I assume
[]
was chosen because that is how we format the messages on the console fromtedge mqtt sub
. So given that, maybe we can just provide an example in addition to the syntax,e.g.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, the bracketed topic syntax has been chosen to be able to pipe
tedge mqtt sub
intotedge mapping test
.However, if the topic is provided on the command line then no brackets are expected: