Skip to content

Commit 208986a

Browse files
committed
Naming fix: Filter -> Flow Step
In some context `Flow script` is more appropriate. For instance, when two steps use the same script. Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent ec21ead commit 208986a

File tree

20 files changed

+189
-187
lines changed

20 files changed

+189
-187
lines changed

crates/core/tedge/src/cli/mapping/cli.rs

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ use tedge_gen_mapper::MessageProcessor;
1313

1414
#[derive(clap::Subcommand, Debug)]
1515
pub enum TEdgeMappingCli {
16-
/// List flows and filters
16+
/// List flows and steps
1717
List {
18-
/// Path to flow and filter specs
18+
/// Path to the directory of flows and steps
1919
///
2020
/// Default to /etc/tedge/gen-mapper
2121
#[clap(long)]
@@ -30,17 +30,17 @@ pub enum TEdgeMappingCli {
3030

3131
/// Process message samples
3232
Test {
33-
/// Path to flow and filter specs
33+
/// Path to the directory of flows and steps
3434
///
3535
/// Default to /etc/tedge/gen-mapper
3636
#[clap(long)]
3737
mapping_dir: Option<PathBuf>,
3838

39-
/// Path to the javascript filter or TOML flow definition
39+
/// Path to the flow step script or TOML flow definition
4040
///
4141
/// If none is provided, applies all the matching flows
4242
#[clap(long)]
43-
filter: Option<PathBuf>,
43+
flow: Option<PathBuf>,
4444

4545
/// Send a tick after all the message samples
4646
#[clap(long = "final-tick")]
@@ -69,7 +69,7 @@ impl BuildCommand for TEdgeMappingCli {
6969

7070
TEdgeMappingCli::Test {
7171
mapping_dir,
72-
filter,
72+
flow,
7373
final_tick,
7474
topic,
7575
payload,
@@ -83,7 +83,7 @@ impl BuildCommand for TEdgeMappingCli {
8383
};
8484
Ok(TestCommand {
8585
mapping_dir,
86-
filter,
86+
flow,
8787
message,
8888
final_tick,
8989
}
@@ -101,10 +101,10 @@ impl TEdgeMappingCli {
101101
pub async fn load_flows(mapping_dir: &PathBuf) -> Result<MessageProcessor, Error> {
102102
MessageProcessor::try_new(mapping_dir)
103103
.await
104-
.with_context(|| format!("loading flows and filters from {}", mapping_dir.display()))
104+
.with_context(|| format!("loading flows and steps from {}", mapping_dir.display()))
105105
}
106106

107-
pub async fn load_filter(
107+
pub async fn load_file(
108108
mapping_dir: &PathBuf,
109109
path: &PathBuf,
110110
) -> Result<MessageProcessor, Error> {
@@ -113,9 +113,9 @@ impl TEdgeMappingCli {
113113
.await
114114
.with_context(|| format!("loading flow {flow}", flow = path.display()))
115115
} else {
116-
MessageProcessor::try_new_single_filter(mapping_dir, path)
116+
MessageProcessor::try_new_single_step_flow(mapping_dir, path)
117117
.await
118-
.with_context(|| format!("loading filter {filter}", filter = path.display()))
118+
.with_context(|| format!("loading flow script {script}", script = path.display()))
119119
}
120120
}
121121
}

crates/core/tedge/src/cli/mapping/list.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl ListCommand {
4141
fn display((flow_id, flow): (&String, &Flow)) {
4242
println!("{flow_id}");
4343
for step in flow.steps.iter() {
44-
println!("\t{}", step.filter.path.display());
44+
println!("\t{}", step.script.path.display());
4545
}
4646
}
4747
}

crates/core/tedge/src/cli/mapping/test.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,7 @@ use tokio::io::Stdin;
1212

1313
pub struct TestCommand {
1414
pub mapping_dir: PathBuf,
15-
pub filter: Option<PathBuf>,
15+
pub flow: Option<PathBuf>,
1616
pub message: Option<Message>,
1717
pub final_tick: bool,
1818
}
@@ -21,15 +21,15 @@ pub struct TestCommand {
2121
impl Command for TestCommand {
2222
fn description(&self) -> String {
2323
format!(
24-
"process message samples using flows and filters in {:}",
24+
"process message samples using flows and steps in {:}",
2525
self.mapping_dir.display()
2626
)
2727
}
2828

2929
async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
30-
let mut processor = match &self.filter {
30+
let mut processor = match &self.flow {
3131
None => TEdgeMappingCli::load_flows(&self.mapping_dir).await?,
32-
Some(filter) => TEdgeMappingCli::load_filter(&self.mapping_dir, filter).await?,
32+
Some(flow) => TEdgeMappingCli::load_file(&self.mapping_dir, flow).await?,
3333
};
3434
if let Some(message) = &self.message {
3535
let timestamp = DateTime::now();
@@ -74,7 +74,7 @@ impl TestCommand {
7474
}
7575
}
7676

77-
fn print(messages: Result<Vec<Message>, FilterError>) {
77+
fn print(messages: Result<Vec<Message>, FlowError>) {
7878
match messages {
7979
Ok(messages) => {
8080
for message in messages {

crates/extensions/tedge_gen_mapper/flows/circuit-breaker.js

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
1-
// A filter that let messages go through, unless too many messages are received within a given period
1+
// A flow step that let messages go through, unless too many messages are received within a given period
22
//
3-
// This filter is configured by the following settings:
3+
// This flow step is configured by the following settings:
44
// - tick_every_seconds: the frequency at which the sliding window is moved
55
// - tick_count: size of the time windows
66
// - too_many: how many messages is too many (received during the last tick_count*tick_every_seconds seconds)
7-
// - back_to_normal: how many messages is okay to reactivate the filter if bellow
7+
// - back_to_normal: how many messages is okay to reactivate the flow step if bellow
88
// - message_on_too_many: message sent when the upper threshold is crossed
99
// - message_on_back_to_normal: message sent when the lower threshold is crossed
1010
// - stats_topic: topic for statistic messages
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
input_topics = ["collectd/+/+/+"]
22

33
steps = [
4-
{ filter = "collectd-to-te.js" },
5-
{ filter = "average.js", tick_every_seconds = 10 }
4+
{ script = "collectd-to-te.js" },
5+
{ script = "average.js", tick_every_seconds = 10 }
66
]

crates/extensions/tedge_gen_mapper/flows/loop.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,6 @@
22
input_topics = ["loopback/#"]
33

44
steps = [
5-
{ filter = "add_timestamp.js" },
6-
{ filter = "circuit-breaker.js", tick_every_seconds = 1, config = { stats_topic = "te/error", too_many = 10000, message_on_too_many = { topic = "te/device/main///a/too-many-messages", payload = "too many messages" }, message_on_back_to_normal = { topic = "te/device/main///a/too-many-messages", payload = "back to normal" } } }
5+
{ script = "add_timestamp.js" },
6+
{ script = "circuit-breaker.js", tick_every_seconds = 1, config = { stats_topic = "te/error", too_many = 10000, message_on_too_many = { topic = "te/device/main///a/too-many-messages", payload = "too many messages" }, message_on_back_to_normal = { topic = "te/device/main///a/too-many-messages", payload = "back to normal" } } }
77
]
Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
input_topics = ["te/+/+/+/+/m/+"]
22

33
steps = [
4-
{ filter = "add_timestamp.js" },
5-
{ filter = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
4+
{ script = "add_timestamp.js" },
5+
{ script = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
66
]

crates/extensions/tedge_gen_mapper/src/actor.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ impl Actor for GenMapper {
4141
message = self.messages.recv() => {
4242
match message {
4343
Some(InputMessage::MqttMessage(message)) => match Message::try_from(message) {
44-
Ok(message) => self.filter(message).await?,
44+
Ok(message) => self.process(message).await?,
4545
Err(err) => {
4646
error!(target: "gen-mapper", "Cannot process message: {err}");
4747
}
@@ -51,7 +51,7 @@ impl Actor for GenMapper {
5151
continue;
5252
};
5353
if matches!(path.extension(), Some("js" | "ts")) {
54-
self.processor.reload_filter(path).await;
54+
self.processor.reload_script(path).await;
5555
} else if path.extension() == Some("toml") {
5656
self.processor.reload_flow(path).await;
5757
self.send_updated_subscriptions().await?;
@@ -71,7 +71,7 @@ impl Actor for GenMapper {
7171
continue;
7272
};
7373
if matches!(path.extension(), Some("js" | "ts")) {
74-
self.processor.remove_filter(path).await;
74+
self.processor.remove_script(path).await;
7575
} else if path.extension() == Some("toml") {
7676
self.processor.remove_flow(path).await;
7777
self.send_updated_subscriptions().await?;
@@ -102,7 +102,7 @@ impl GenMapper {
102102
diff
103103
}
104104

105-
async fn filter(&mut self, message: Message) -> Result<(), RuntimeError> {
105+
async fn process(&mut self, message: Message) -> Result<(), RuntimeError> {
106106
let timestamp = DateTime::now();
107107
for (flow_id, flow_messages) in self.processor.process(&timestamp, &message).await {
108108
match flow_messages {

crates/extensions/tedge_gen_mapper/src/config.rs

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
use crate::flow::Flow;
22
use crate::flow::FlowStep;
3-
use crate::js_filter::JsFilter;
43
use crate::js_runtime::JsRuntime;
4+
use crate::js_script::JsScript;
55
use crate::LoadError;
66
use camino::Utf8Path;
77
use camino::Utf8PathBuf;
@@ -19,7 +19,7 @@ pub struct FlowConfig {
1919

2020
#[derive(Deserialize)]
2121
pub struct StepConfig {
22-
filter: FilterSpec,
22+
script: ScriptSpec,
2323

2424
#[serde(default)]
2525
config: Option<Value>,
@@ -33,7 +33,7 @@ pub struct StepConfig {
3333

3434
#[derive(Deserialize)]
3535
#[serde(untagged)]
36-
pub enum FilterSpec {
36+
pub enum ScriptSpec {
3737
JavaScript(Utf8PathBuf),
3838
}
3939

@@ -47,10 +47,10 @@ pub enum ConfigError {
4747
}
4848

4949
impl FlowConfig {
50-
pub fn from_filter(filter: Utf8PathBuf) -> Self {
50+
pub fn from_step(script: Utf8PathBuf) -> Self {
5151
let input_topic = "#".to_string();
5252
let step = StepConfig {
53-
filter: FilterSpec::JavaScript(filter),
53+
script: ScriptSpec::JavaScript(script),
5454
config: None,
5555
tick_every_seconds: 0,
5656
meta_topics: vec![],
@@ -71,7 +71,7 @@ impl FlowConfig {
7171
let mut steps = vec![];
7272
for (i, step) in self.steps.into_iter().enumerate() {
7373
let mut step = step.compile(config_dir, i, &source).await?;
74-
js_runtime.load_filter(&mut step.filter).await?;
74+
js_runtime.load_script(&mut step.script).await?;
7575
step.check(&source);
7676
step.fix();
7777
steps.push(step);
@@ -91,17 +91,17 @@ impl StepConfig {
9191
index: usize,
9292
flow: &Utf8Path,
9393
) -> Result<FlowStep, ConfigError> {
94-
let path = match self.filter {
95-
FilterSpec::JavaScript(path) if path.is_absolute() => path.into(),
96-
FilterSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
97-
FilterSpec::JavaScript(path) => config_dir.join(path),
94+
let path = match self.script {
95+
ScriptSpec::JavaScript(path) if path.is_absolute() => path.into(),
96+
ScriptSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
97+
ScriptSpec::JavaScript(path) => config_dir.join(path),
9898
};
99-
let filter = JsFilter::new(flow.to_owned().into(), index, path)
99+
let script = JsScript::new(flow.to_owned().into(), index, path)
100100
.with_config(self.config)
101101
.with_tick_every_seconds(self.tick_every_seconds);
102102
let config_topics = topic_filters(&self.meta_topics)?;
103103
Ok(FlowStep {
104-
filter,
104+
script,
105105
config_topics,
106106
})
107107
}

0 commit comments

Comments
 (0)