Skip to content

Commit d798cb8

Browse files
committed
Tedge mapping test a single single filter
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 214ccc3 commit d798cb8

File tree

6 files changed

+148
-63
lines changed

6 files changed

+148
-63
lines changed

crates/core/tedge/src/cli/mapping/cli.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -108,4 +108,19 @@ impl TEdgeMappingCli {
108108
)
109109
})
110110
}
111+
112+
pub async fn load_filter(
113+
mapping_dir: &PathBuf,
114+
path: &PathBuf,
115+
) -> Result<MessageProcessor, Error> {
116+
if let Some("toml") = path.extension().and_then(|s| s.to_str()) {
117+
MessageProcessor::try_new_single_pipeline(mapping_dir, path)
118+
.await
119+
.with_context(|| format!("loading pipeline {pipeline}", pipeline = path.display()))
120+
} else {
121+
MessageProcessor::try_new_single_filter(mapping_dir, path)
122+
.await
123+
.with_context(|| format!("loading filter {filter}", filter = path.display()))
124+
}
125+
}
111126
}

crates/core/tedge/src/cli/mapping/test.rs

Lines changed: 16 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,10 @@ impl Command for TestCommand {
2727
}
2828

2929
async fn execute(&self, _config: TEdgeConfig) -> Result<(), MaybeFancy<Error>> {
30-
let mut processor = TEdgeMappingCli::load_pipelines(&self.mapping_dir).await?;
30+
let mut processor = match &self.filter {
31+
None => TEdgeMappingCli::load_pipelines(&self.mapping_dir).await?,
32+
Some(filter) => TEdgeMappingCli::load_filter(&self.mapping_dir, filter).await?,
33+
};
3134
if let Some(message) = &self.message {
3235
let timestamp = DateTime::now();
3336
self.process(&mut processor, message, &timestamp).await;
@@ -53,37 +56,21 @@ impl TestCommand {
5356
message: &Message,
5457
timestamp: &DateTime,
5558
) {
56-
match &self.filter {
57-
Some(filter) => {
58-
let filter_name = filter.display().to_string();
59-
print(
60-
processor
61-
.process_with_pipeline(&filter_name, timestamp, message)
62-
.await,
63-
)
64-
}
65-
None => processor
66-
.process(timestamp, message)
67-
.await
68-
.into_iter()
69-
.map(|(_, v)| v)
70-
.for_each(print),
71-
}
59+
processor
60+
.process(timestamp, message)
61+
.await
62+
.into_iter()
63+
.map(|(_, v)| v)
64+
.for_each(print)
7265
}
7366

7467
async fn tick(&self, processor: &mut MessageProcessor, timestamp: &DateTime) {
75-
match &self.filter {
76-
Some(filter) => {
77-
let filter_name = filter.display().to_string();
78-
print(processor.tick_with_pipeline(&filter_name, timestamp).await)
79-
}
80-
None => processor
81-
.tick(timestamp)
82-
.await
83-
.into_iter()
84-
.map(|(_, v)| v)
85-
.for_each(print),
86-
}
68+
processor
69+
.tick(timestamp)
70+
.await
71+
.into_iter()
72+
.map(|(_, v)| v)
73+
.for_each(print)
8774
}
8875
}
8976

crates/extensions/tedge_gen_mapper/src/config.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,20 @@ pub enum ConfigError {
4545
}
4646

4747
impl PipelineConfig {
48+
pub fn from_filter(filter: Utf8PathBuf) -> Self {
49+
let input_topic = "#".to_string();
50+
let stage = StageConfig {
51+
filter: FilterSpec::JavaScript(filter),
52+
config: None,
53+
tick_every_seconds: 1,
54+
meta_topics: vec![],
55+
};
56+
Self {
57+
input_topics: vec![input_topic],
58+
stages: vec![stage],
59+
}
60+
}
61+
4862
pub fn compile(
4963
self,
5064
js_runtime: &JsRuntime,
@@ -69,6 +83,7 @@ impl StageConfig {
6983
pub fn compile(self, _js_runtime: &JsRuntime, config_dir: &Path) -> Result<Stage, ConfigError> {
7084
let path = match self.filter {
7185
FilterSpec::JavaScript(path) if path.is_absolute() => path.into(),
86+
FilterSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
7287
FilterSpec::JavaScript(path) => config_dir.join(path),
7388
};
7489
let filter = JsFilter::new(path)

crates/extensions/tedge_gen_mapper/src/runtime.rs

Lines changed: 101 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,43 @@ impl MessageProcessor {
4242
})
4343
}
4444

45+
pub async fn try_new_single_pipeline(
46+
config_dir: impl AsRef<Path>,
47+
pipeline: impl AsRef<Path>,
48+
) -> Result<Self, LoadError> {
49+
let config_dir = config_dir.as_ref().to_owned();
50+
let pipeline = pipeline.as_ref().to_owned();
51+
let mut js_runtime = JsRuntime::try_new().await?;
52+
let mut pipeline_specs = PipelineSpecs::default();
53+
pipeline_specs
54+
.load_single_pipeline(&mut js_runtime, &config_dir, &pipeline)
55+
.await;
56+
let pipelines = pipeline_specs.compile(&js_runtime, &config_dir);
57+
Ok(MessageProcessor {
58+
config_dir,
59+
pipelines,
60+
js_runtime,
61+
})
62+
}
63+
64+
pub async fn try_new_single_filter(
65+
config_dir: impl AsRef<Path>,
66+
filter: impl AsRef<Path>,
67+
) -> Result<Self, LoadError> {
68+
let config_dir = config_dir.as_ref().to_owned();
69+
let mut js_runtime = JsRuntime::try_new().await?;
70+
let mut pipeline_specs = PipelineSpecs::default();
71+
pipeline_specs
72+
.load_single_filter(&mut js_runtime, &filter)
73+
.await;
74+
let pipelines = pipeline_specs.compile(&js_runtime, &config_dir);
75+
Ok(MessageProcessor {
76+
config_dir,
77+
pipelines,
78+
js_runtime,
79+
})
80+
}
81+
4582
pub fn subscriptions(&self) -> TopicFilter {
4683
let mut topics = TopicFilter::empty();
4784
for pipeline in self.pipelines.values() {
@@ -63,19 +100,6 @@ impl MessageProcessor {
63100
out_messages
64101
}
65102

66-
pub async fn process_with_pipeline(
67-
&mut self,
68-
pipeline_id: &String,
69-
timestamp: &DateTime,
70-
message: &Message,
71-
) -> Result<Vec<Message>, FilterError> {
72-
let pipeline = self
73-
.pipelines
74-
.get_mut(pipeline_id)
75-
.ok_or_else(|| anyhow::anyhow!("No such pipeline: {pipeline_id}"))?;
76-
pipeline.process(&self.js_runtime, timestamp, message).await
77-
}
78-
79103
pub async fn tick(
80104
&mut self,
81105
timestamp: &DateTime,
@@ -88,18 +112,6 @@ impl MessageProcessor {
88112
out_messages
89113
}
90114

91-
pub async fn tick_with_pipeline(
92-
&mut self,
93-
pipeline_id: &String,
94-
timestamp: &DateTime,
95-
) -> Result<Vec<Message>, FilterError> {
96-
let pipeline = self
97-
.pipelines
98-
.get_mut(pipeline_id)
99-
.ok_or_else(|| anyhow::anyhow!("No such pipeline: {pipeline_id}"))?;
100-
pipeline.tick(&self.js_runtime, timestamp).await
101-
}
102-
103115
pub async fn dump_memory_stats(&self) {
104116
self.js_runtime.dump_memory_stats().await;
105117
}
@@ -235,10 +247,71 @@ impl PipelineSpecs {
235247
}
236248
}
237249

250+
pub async fn load_single_pipeline(
251+
&mut self,
252+
js_runtime: &mut JsRuntime,
253+
config_dir: &PathBuf,
254+
pipeline: &Path,
255+
) {
256+
let Some(path) = Utf8Path::from_path(pipeline).map(|p| p.to_path_buf()) else {
257+
error!(target: "MAPPING", "Skipping non UTF8 path: {}", pipeline.display());
258+
return;
259+
};
260+
if let Err(err) = self.load_pipeline(&path).await {
261+
error!(target: "MAPPING", "Failed to load pipeline {path}: {err}");
262+
return;
263+
}
264+
265+
let Ok(mut entries) = read_dir(config_dir).await.map_err(|err|
266+
error!(target: "MAPPING", "Failed to read filters from {}: {err}", config_dir.display())
267+
) else {
268+
return;
269+
};
270+
271+
while let Ok(Some(entry)) = entries.next_entry().await {
272+
let Some(path) = Utf8Path::from_path(&entry.path()).map(|p| p.to_path_buf()) else {
273+
error!(target: "MAPPING", "Skipping non UTF8 path: {}", entry.path().display());
274+
continue;
275+
};
276+
if let Ok(file_type) = entry.file_type().await {
277+
if file_type.is_file() {
278+
match path.extension() {
279+
Some("js") | Some("ts") => {
280+
info!(target: "MAPPING", "Loading filter: {path}");
281+
if let Err(err) = self.load_filter(js_runtime, path).await {
282+
error!(target: "MAPPING", "Failed to load filter: {err}");
283+
}
284+
}
285+
_ => {}
286+
}
287+
}
288+
}
289+
}
290+
}
291+
292+
pub async fn load_single_filter(
293+
&mut self,
294+
js_runtime: &mut JsRuntime,
295+
filter: impl AsRef<Path>,
296+
) {
297+
let filter = filter.as_ref();
298+
let Some(path) = Utf8Path::from_path(filter).map(|p| p.to_path_buf()) else {
299+
error!(target: "MAPPING", "Skipping non UTF8 path: {}", filter.display());
300+
return;
301+
};
302+
if let Err(err) = js_runtime.load_file(&path).await {
303+
error!(target: "MAPPING", "Failed to load filter {path}: {err}");
304+
}
305+
let pipeline_id = MessageProcessor::pipeline_id(&path);
306+
let pipeline = PipelineConfig::from_filter(path.to_owned());
307+
self.pipeline_specs
308+
.insert(pipeline_id, (path.to_owned(), pipeline));
309+
}
310+
238311
async fn load_pipeline(&mut self, file: impl AsRef<Utf8Path>) -> Result<(), LoadError> {
239312
let path = file.as_ref();
240313
let pipeline_id = MessageProcessor::pipeline_id(path);
241-
let specs = read_to_string(file.as_ref()).await?;
314+
let specs = read_to_string(path).await?;
242315
let pipeline: PipelineConfig = toml::from_str(&specs)?;
243316
self.pipeline_specs
244317
.insert(pipeline_id, (path.to_owned(), pipeline));
@@ -249,9 +322,9 @@ impl PipelineSpecs {
249322
async fn load_filter(
250323
&mut self,
251324
js_runtime: &mut JsRuntime,
252-
file: impl AsRef<Utf8Path>,
325+
file: impl AsRef<Path>,
253326
) -> Result<(), LoadError> {
254-
js_runtime.load_file(file.as_ref()).await?;
327+
js_runtime.load_file(file).await?;
255328
Ok(())
256329
}
257330

tests/RobotFramework/tests/tedge_gen_mapper/pipelines/average.toml

Lines changed: 0 additions & 5 deletions
This file was deleted.

tests/RobotFramework/tests/tedge_gen_mapper/tedge_gen_mapper.robot

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ Units are configured using topic metadata
5050

5151
Computing average over a time window
5252
${transformed_msg} Execute Command
53-
... cat /etc/tedge/gen-mapper/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge mapping test --final-tick
53+
... cat /etc/tedge/gen-mapper/average.samples | awk '{ print $2 }' FS\='INPUT:' | tedge mapping test --final-tick --filter /etc/tedge/gen-mapper/average.js
5454
... strip=True
5555
${expected_msg} Execute Command
5656
... cat /etc/tedge/gen-mapper/average.samples | awk '{ if ($2) print $2 }' FS\='OUTPUT: '

0 commit comments

Comments
 (0)