Skip to content

Commit 8aa71c7

Browse files
committed
Naming fix: Stage -> Step
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent c31c068 commit 8aa71c7

File tree

11 files changed

+55
-52
lines changed

11 files changed

+55
-52
lines changed

crates/core/tedge/src/cli/mapping/list.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,10 @@ pub struct ListCommand {
1414
#[async_trait::async_trait]
1515
impl Command for ListCommand {
1616
fn description(&self) -> String {
17-
format!("list flows and filters in {:}", self.mapping_dir.display())
17+
format!(
18+
"list flows and flow steps in {:}",
19+
self.mapping_dir.display()
20+
)
1821
}
1922

2023
async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
@@ -37,8 +40,8 @@ impl Command for ListCommand {
3740
impl ListCommand {
3841
fn display((flow_id, flow): (&String, &Flow)) {
3942
println!("{flow_id}");
40-
for stage in flow.stages.iter() {
41-
println!("\t{}", stage.filter.path.display());
43+
for step in flow.steps.iter() {
44+
println!("\t{}", step.filter.path.display());
4245
}
4346
}
4447
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
input_topics = ["collectd/+/+/+"]
22

3-
stages = [
3+
steps = [
44
{ filter = "collectd-to-te.js" },
55
{ filter = "average.js", tick_every_seconds = 10 }
66
]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# This flow is on purpose looping: the messages are published to the same topic
22
input_topics = ["loopback/#"]
33

4-
stages = [
4+
steps = [
55
{ filter = "add_timestamp.js" },
66
{ filter = "circuit-breaker.js", tick_every_seconds = 1, config = { stats_topic = "te/error", too_many = 10000, message_on_too_many = { topic = "te/device/main///a/too-many-messages", payload = "too many messages" }, message_on_back_to_normal = { topic = "te/device/main///a/too-many-messages", payload = "back to normal" } } }
77
]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
input_topics = ["te/+/+/+/+/m/+"]
22

3-
stages = [
3+
steps = [
44
{ filter = "add_timestamp.js" },
55
{ filter = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
66
]

crates/extensions/tedge_gen_mapper/src/config.rs

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::flow::Flow;
2-
use crate::flow::Stage;
2+
use crate::flow::FlowStep;
33
use crate::js_filter::JsFilter;
44
use crate::js_runtime::JsRuntime;
55
use crate::LoadError;
@@ -14,11 +14,11 @@ use tedge_mqtt_ext::TopicFilter;
1414
#[derive(Deserialize)]
1515
pub struct FlowConfig {
1616
input_topics: Vec<String>,
17-
stages: Vec<StageConfig>,
17+
steps: Vec<StepConfig>,
1818
}
1919

2020
#[derive(Deserialize)]
21-
pub struct StageConfig {
21+
pub struct StepConfig {
2222
filter: FilterSpec,
2323

2424
#[serde(default)]
@@ -49,15 +49,15 @@ pub enum ConfigError {
4949
impl FlowConfig {
5050
pub fn from_filter(filter: Utf8PathBuf) -> Self {
5151
let input_topic = "#".to_string();
52-
let stage = StageConfig {
52+
let step = StepConfig {
5353
filter: FilterSpec::JavaScript(filter),
5454
config: None,
5555
tick_every_seconds: 0,
5656
meta_topics: vec![],
5757
};
5858
Self {
5959
input_topics: vec![input_topic],
60-
stages: vec![stage],
60+
steps: vec![step],
6161
}
6262
}
6363

@@ -68,29 +68,29 @@ impl FlowConfig {
6868
source: Utf8PathBuf,
6969
) -> Result<Flow, ConfigError> {
7070
let input_topics = topic_filters(&self.input_topics)?;
71-
let mut stages = vec![];
72-
for (i, stage) in self.stages.into_iter().enumerate() {
73-
let mut stage = stage.compile(config_dir, i, &source).await?;
74-
js_runtime.load_filter(&mut stage.filter).await?;
75-
stage.check(&source);
76-
stage.fix();
77-
stages.push(stage);
71+
let mut steps = vec![];
72+
for (i, step) in self.steps.into_iter().enumerate() {
73+
let mut step = step.compile(config_dir, i, &source).await?;
74+
js_runtime.load_filter(&mut step.filter).await?;
75+
step.check(&source);
76+
step.fix();
77+
steps.push(step);
7878
}
7979
Ok(Flow {
8080
input_topics,
81-
stages,
81+
steps,
8282
source,
8383
})
8484
}
8585
}
8686

87-
impl StageConfig {
87+
impl StepConfig {
8888
pub async fn compile(
8989
self,
9090
config_dir: &Path,
9191
index: usize,
9292
flow: &Utf8Path,
93-
) -> Result<Stage, ConfigError> {
93+
) -> Result<FlowStep, ConfigError> {
9494
let path = match self.filter {
9595
FilterSpec::JavaScript(path) if path.is_absolute() => path.into(),
9696
FilterSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
@@ -100,7 +100,7 @@ impl StageConfig {
100100
.with_config(self.config)
101101
.with_tick_every_seconds(self.tick_every_seconds);
102102
let config_topics = topic_filters(&self.meta_topics)?;
103-
Ok(Stage {
103+
Ok(FlowStep {
104104
filter,
105105
config_topics,
106106
})

crates/extensions/tedge_gen_mapper/src/flow.rs

Lines changed: 18 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -16,14 +16,14 @@ pub struct Flow {
1616
/// The source topics
1717
pub input_topics: TopicFilter,
1818

19-
/// Transformation stages to apply in order to the messages
20-
pub stages: Vec<Stage>,
19+
/// Transformation steps to apply in order to the messages
20+
pub steps: Vec<FlowStep>,
2121

2222
pub source: Utf8PathBuf,
2323
}
2424

25-
/// A message transformation stage
26-
pub struct Stage {
25+
/// A message transformation step
26+
pub struct FlowStep {
2727
pub filter: JsFilter,
2828
pub config_topics: TopicFilter,
2929
}
@@ -55,8 +55,8 @@ pub enum FilterError {
5555
impl Flow {
5656
pub fn topics(&self) -> TopicFilter {
5757
let mut topics = self.input_topics.clone();
58-
for stage in self.stages.iter() {
59-
topics.add_all(stage.config_topics.clone())
58+
for step in self.steps.iter() {
59+
topics.add_all(step.config_topics.clone())
6060
}
6161
topics
6262
}
@@ -66,9 +66,9 @@ impl Flow {
6666
js_runtime: &JsRuntime,
6767
message: &Message,
6868
) -> Result<(), FilterError> {
69-
for stage in self.stages.iter_mut() {
70-
if stage.config_topics.accept_topic_name(&message.topic) {
71-
stage.filter.update_config(js_runtime, message).await?
69+
for step in self.steps.iter_mut() {
70+
if step.config_topics.accept_topic_name(&message.topic) {
71+
step.filter.update_config(js_runtime, message).await?
7272
}
7373
}
7474
Ok(())
@@ -88,12 +88,12 @@ impl Flow {
8888

8989
let stated_at = stats.flow_process_start(self.source.as_str());
9090
let mut messages = vec![message.clone()];
91-
for stage in self.stages.iter() {
92-
let js = stage.filter.source();
91+
for step in self.steps.iter() {
92+
let js = step.filter.source();
9393
let mut transformed_messages = vec![];
9494
for message in messages.iter() {
9595
let filter_started_at = stats.filter_start(&js, "process");
96-
let filter_output = stage.filter.process(js_runtime, timestamp, message).await;
96+
let filter_output = step.filter.process(js_runtime, timestamp, message).await;
9797
match &filter_output {
9898
Ok(messages) => {
9999
stats.filter_done(&js, "process", filter_started_at, messages.len())
@@ -117,13 +117,13 @@ impl Flow {
117117
) -> Result<Vec<Message>, FilterError> {
118118
let stated_at = stats.flow_tick_start(self.source.as_str());
119119
let mut messages = vec![];
120-
for stage in self.stages.iter() {
121-
let js = stage.filter.source();
120+
for step in self.steps.iter() {
121+
let js = step.filter.source();
122122
// Process first the messages triggered upstream by the tick
123123
let mut transformed_messages = vec![];
124124
for message in messages.iter() {
125125
let filter_started_at = stats.filter_start(&js, "process");
126-
let filter_output = stage.filter.process(js_runtime, timestamp, message).await;
126+
let filter_output = step.filter.process(js_runtime, timestamp, message).await;
127127
match &filter_output {
128128
Ok(messages) => {
129129
stats.filter_done(&js, "process", filter_started_at, messages.len())
@@ -135,22 +135,22 @@ impl Flow {
135135

136136
// Only then process the tick
137137
let filter_started_at = stats.filter_start(&js, "tick");
138-
let tick_output = stage.filter.tick(js_runtime, timestamp).await;
138+
let tick_output = step.filter.tick(js_runtime, timestamp).await;
139139
match &tick_output {
140140
Ok(messages) => stats.filter_done(&js, "tick", filter_started_at, messages.len()),
141141
Err(_) => stats.filter_failed(&js, "tick"),
142142
}
143143
transformed_messages.extend(tick_output?);
144144

145-
// Iterate with all the messages collected at this stage
145+
// Iterate with all the messages collected at this step
146146
messages = transformed_messages;
147147
}
148148
stats.flow_tick_done(self.source.as_str(), stated_at, messages.len());
149149
Ok(messages)
150150
}
151151
}
152152

153-
impl Stage {
153+
impl FlowStep {
154154
pub(crate) fn check(&self, flow: &Utf8Path) {
155155
let filter = &self.filter;
156156
if filter.no_js_process {

crates/extensions/tedge_gen_mapper/src/runtime.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -142,9 +142,9 @@ impl MessageProcessor {
142142

143143
pub async fn reload_filter(&mut self, path: Utf8PathBuf) {
144144
for flow in self.flows.values_mut() {
145-
for stage in &mut flow.stages {
146-
if stage.filter.path() == path {
147-
match self.js_runtime.load_filter(&mut stage.filter).await {
145+
for step in &mut flow.steps {
146+
if step.filter.path() == path {
147+
match self.js_runtime.load_filter(&mut step.filter).await {
148148
Ok(()) => {
149149
info!(target: "gen-mapper", "Reloaded filter {path}");
150150
}
@@ -160,8 +160,8 @@ impl MessageProcessor {
160160

161161
pub async fn remove_filter(&mut self, path: Utf8PathBuf) {
162162
for (flow_id, flow) in self.flows.iter() {
163-
for stage in flow.stages.iter() {
164-
if stage.filter.path() == path {
163+
for step in flow.steps.iter() {
164+
if step.filter.path() == path {
165165
warn!(target: "gen-mapper", "Removing a filter used by {flow_id}: {path}");
166166
return;
167167
}

docs/src/references/mappers/gen-mapper.md

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -48,14 +48,14 @@ which is used to drive all MQTT message transformations.
4848
- A filter is defined by a Javascript file with `.js` extension.
4949
- The definition of flows must provide a list of MQTT topics to subscribe to.
5050
- The flow will be feed with all the messages received on these topics.
51-
- A flow definition also provides a list of stages.
52-
- Each stage is built from a javascript and is possibly given a config (arbitrary json that will be passed to the script)
53-
- Each stage can also subscribe to a list of MQTT topics (which messages will be passed to the script to update its config)
51+
- A flow definition provides a list of steps.
52+
- Each step is built from a javascript and is possibly given a config (arbitrary json that will be passed to the script)
53+
- Each step can also subscribe to a list of MQTT topics (which messages will be passed to the script to update its config)
5454

5555
```toml
5656
input_topics = ["te/+/+/+/+/m/+"]
5757

58-
stages = [
58+
steps = [
5959
{ filter = "add_timestamp.js" },
6060
{ filter = "drop_stragglers.js", config = { max_delay = 60 } },
6161
{ filter = "te_to_c8y.js", meta_topics = ["te/+/+/+/+/m/+/meta"] }
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
input_topics = ["test/+/+/+/+/e/+"]
22

3-
stages = [
3+
steps = [
44
{ filter = "count-messages.js", config = { topic = "test/count/e" }, tick_every_seconds = 1 },
55
]
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
input_topics = ["test/+/+/+/+/m/+"]
22

3-
stages = [
3+
steps = [
44
{ filter = "count-messages.js", config = { topic = "test/count/m" }, tick_every_seconds = 1 },
55
]

0 commit comments

Comments
 (0)