Skip to content

Commit c2e12a0

Browse files
committed
Warn when javascript module and config are not consistent
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 0f80192 commit c2e12a0

File tree

6 files changed

+81
-18
lines changed

6 files changed

+81
-18
lines changed

crates/common/mqtt_channel/src/topics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,10 @@ impl TopicFilter {
116116
}
117117
}
118118

119+
pub fn is_empty(&self) -> bool {
120+
self.patterns.is_empty()
121+
}
122+
119123
/// Check if the given topic matches this filter pattern.
120124
pub fn accept_topic_name(&self, topic: &str) -> bool {
121125
self.patterns

crates/extensions/tedge_gen_mapper/src/config.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,11 +70,9 @@ impl PipelineConfig {
7070
let input_topics = topic_filters(&self.input_topics)?;
7171
let mut stages = vec![];
7272
for (i, stage) in self.stages.into_iter().enumerate() {
73-
let stage = stage.compile(config_dir, i, &source).await?;
74-
let filter = &stage.filter;
75-
js_runtime
76-
.load_file(filter.module_name(), filter.path())
77-
.await?;
73+
let mut stage = stage.compile(config_dir, i, &source).await?;
74+
js_runtime.load_filter(&mut stage.filter).await?;
75+
stage.check(&source);
7876
stages.push(stage);
7977
}
8078
Ok(Pipeline {

crates/extensions/tedge_gen_mapper/src/js_filter.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,9 @@ pub struct JsFilter {
1818
pub path: PathBuf,
1919
pub config: JsonValue,
2020
pub tick_every_seconds: u64,
21+
pub no_js_process: bool,
22+
pub no_js_update_config: bool,
23+
pub no_js_tick: bool,
2124
}
2225

2326
#[derive(Clone, Debug)]
@@ -37,6 +40,9 @@ impl JsFilter {
3740
path,
3841
config: JsonValue::default(),
3942
tick_every_seconds: 0,
43+
no_js_process: true,
44+
no_js_update_config: true,
45+
no_js_tick: true,
4046
}
4147
}
4248

@@ -81,6 +87,10 @@ impl JsFilter {
8187
message: &Message,
8288
) -> Result<Vec<Message>, FilterError> {
8389
debug!(target: "MAPPING", "{}: process({timestamp:?}, {message:?})", self.module_name());
90+
if self.no_js_process {
91+
return Ok(vec![message.clone()]);
92+
}
93+
8494
let input = vec![
8595
timestamp.clone().into(),
8696
message.clone().into(),
@@ -105,6 +115,10 @@ impl JsFilter {
105115
message: &Message,
106116
) -> Result<(), FilterError> {
107117
debug!(target: "MAPPING", "{}: update_config({message:?})", self.module_name());
118+
if self.no_js_update_config {
119+
return Ok(());
120+
}
121+
108122
let input = vec![message.clone().into(), self.config.clone()];
109123
let config = js
110124
.call_function(&self.module_name(), "update_config", input)
@@ -126,6 +140,9 @@ impl JsFilter {
126140
js: &JsRuntime,
127141
timestamp: &DateTime,
128142
) -> Result<Vec<Message>, FilterError> {
143+
if self.no_js_tick {
144+
return Ok(vec![]);
145+
}
129146
if !timestamp.tick_now(self.tick_every_seconds) {
130147
return Ok(vec![]);
131148
}

crates/extensions/tedge_gen_mapper/src/js_runtime.rs

Lines changed: 39 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
use crate::js_filter::JsFilter;
12
use crate::js_filter::JsonValue;
23
use crate::LoadError;
34
use anyhow::anyhow;
@@ -23,11 +24,24 @@ impl JsRuntime {
2324
Ok(JsRuntime { runtime, worker })
2425
}
2526

27+
pub async fn load_filter(&mut self, filter: &mut JsFilter) -> Result<(), LoadError> {
28+
let exports = self.load_file(filter.module_name(), filter.path()).await?;
29+
for export in exports {
30+
match export {
31+
"process" => filter.no_js_process = false,
32+
"update_config" => filter.no_js_update_config = false,
33+
"tick" => filter.no_js_tick = false,
34+
_ => (),
35+
}
36+
}
37+
Ok(())
38+
}
39+
2640
pub async fn load_file(
2741
&mut self,
2842
module_name: String,
2943
path: impl AsRef<Path>,
30-
) -> Result<(), LoadError> {
44+
) -> Result<Vec<&'static str>, LoadError> {
3145
let path = path.as_ref();
3246
let source = tokio::fs::read_to_string(path).await?;
3347
self.load_js(module_name, source).await
@@ -37,13 +51,15 @@ impl JsRuntime {
3751
&mut self,
3852
name: String,
3953
source: impl Into<Vec<u8>>,
40-
) -> Result<(), LoadError> {
54+
) -> Result<Vec<&'static str>, LoadError> {
4155
let (sender, receiver) = oneshot::channel();
4256
let source = source.into();
57+
let imports = vec!["process", "update_config", "tick"];
4358
self.worker
4459
.send(JsRequest::LoadModule {
4560
name,
4661
source,
62+
imports,
4763
sender,
4864
})
4965
.await
@@ -87,7 +103,8 @@ enum JsRequest {
87103
LoadModule {
88104
name: String,
89105
source: Vec<u8>,
90-
sender: oneshot::Sender<Result<(), LoadError>>,
106+
imports: Vec<&'static str>,
107+
sender: oneshot::Sender<Result<Vec<&'static str>, LoadError>>,
91108
},
92109
CallFunction {
93110
module: String,
@@ -118,8 +135,8 @@ impl JsWorker {
118135
let mut modules = JsModules::new();
119136
while let Some(request) = self.requests.recv().await {
120137
match request {
121-
JsRequest::LoadModule{name, source, sender} => {
122-
let result = modules.load_module(ctx.clone(), name, source).await;
138+
JsRequest::LoadModule{name, source, sender, imports} => {
139+
let result = modules.load_module(ctx.clone(), name, source, imports).await;
123140
let _ = sender.send(result);
124141
}
125142
JsRequest::CallFunction{module, function, args, sender} => {
@@ -149,13 +166,24 @@ impl<'js> JsModules<'js> {
149166
ctx: Ctx<'js>,
150167
name: String,
151168
source: Vec<u8>,
152-
) -> Result<(), LoadError> {
169+
imports: Vec<&'static str>,
170+
) -> Result<Vec<&'static str>, LoadError> {
153171
debug!(target: "MAPPING", "compile({name})");
154172
let module = Module::declare(ctx, name.clone(), source)?;
155173
let (module, p) = module.eval()?;
156174
let () = p.finish()?;
175+
176+
let mut exports = vec![];
177+
for import in imports {
178+
if let Ok(Some(v)) = module.get(import) {
179+
if rquickjs::Function::from_value(v).is_ok() {
180+
exports.push(import);
181+
}
182+
}
183+
}
184+
157185
self.modules.insert(name, module);
158-
Ok(())
186+
Ok(exports)
159187
}
160188

161189
async fn call_function(
@@ -178,7 +206,10 @@ impl<'js> JsModules<'js> {
178206
module_name: module_name.clone(),
179207
function: function.clone(),
180208
})?;
181-
let f = rquickjs::Function::from_value(f)?;
209+
let f = rquickjs::Function::from_value(f).map_err(|_| LoadError::UnknownFunction {
210+
module_name: module_name.clone(),
211+
function: function.clone(),
212+
})?;
182213

183214
let r = match &args[..] {
184215
[] => f.call(()),

crates/extensions/tedge_gen_mapper/src/pipeline.rs

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,14 @@
11
use crate::js_filter::JsFilter;
22
use crate::js_runtime::JsRuntime;
33
use crate::LoadError;
4+
use camino::Utf8Path;
45
use camino::Utf8PathBuf;
56
use serde_json::json;
67
use serde_json::Value;
78
use tedge_mqtt_ext::MqttMessage;
89
use tedge_mqtt_ext::TopicFilter;
910
use time::OffsetDateTime;
11+
use tracing::warn;
1012

1113
/// A chain of transformation of MQTT messages
1214
pub struct Pipeline {
@@ -118,6 +120,21 @@ impl Pipeline {
118120
}
119121
}
120122

123+
impl Stage {
124+
pub(crate) fn check(&self, pipeline: &Utf8Path) {
125+
let filter = &self.filter;
126+
if filter.no_js_process {
127+
warn!(target: "MAPPING", "Filter with no 'process' function: {}", filter.path.display());
128+
}
129+
if filter.no_js_update_config && !self.config_topics.is_empty() {
130+
warn!(target: "MAPPING", "Filter with no 'config_update' function: {}; but configured with 'config_topics' in {pipeline}", filter.path.display());
131+
}
132+
if filter.no_js_tick && filter.tick_every_seconds != 0 {
133+
warn!(target: "MAPPING", "Filter with no 'tick' function: {}; but configured with 'tick_every_seconds' in {pipeline}", filter.path.display());
134+
}
135+
}
136+
}
137+
121138
impl DateTime {
122139
pub fn now() -> Self {
123140
DateTime::try_from(OffsetDateTime::now_utc()).unwrap()

crates/extensions/tedge_gen_mapper/src/runtime.rs

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -116,11 +116,7 @@ impl MessageProcessor {
116116
for pipeline in self.pipelines.values_mut() {
117117
for stage in &mut pipeline.stages {
118118
if stage.filter.path() == path {
119-
match self
120-
.js_runtime
121-
.load_file(stage.filter.module_name(), &path)
122-
.await
123-
{
119+
match self.js_runtime.load_filter(&mut stage.filter).await {
124120
Ok(()) => {
125121
info!(target: "gen-mapper", "Reloaded filter {path}");
126122
}

0 commit comments

Comments
 (0)