Skip to content

Commit 094119f

Browse files
committed
Fix JS engine with one module per filter instance
Now, each instance of a script is given its own static state Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent d798cb8 commit 094119f

File tree

10 files changed

+135
-122
lines changed

10 files changed

+135
-122
lines changed

crates/extensions/tedge_gen_mapper/src/actor.rs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -61,9 +61,7 @@ impl Actor for GenMapper {
6161
let Ok(path) = Utf8PathBuf::try_from(path) else {
6262
continue;
6363
};
64-
if matches!(path.extension(), Some("js" | "ts")) {
65-
self.processor.add_filter(path).await;
66-
} else if path.extension() == Some("toml") {
64+
if matches!(path.extension(), Some("toml")) {
6765
self.processor.add_pipeline(path).await;
6866
self.send_updated_subscriptions().await?;
6967
}

crates/extensions/tedge_gen_mapper/src/config.rs

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,11 @@ use crate::js_runtime::JsRuntime;
33
use crate::pipeline::Pipeline;
44
use crate::pipeline::Stage;
55
use crate::LoadError;
6+
use camino::Utf8Path;
67
use camino::Utf8PathBuf;
78
use serde::Deserialize;
89
use serde_json::Value;
10+
use std::fmt::Debug;
911
use std::path::Path;
1012
use tedge_mqtt_ext::TopicFilter;
1113

@@ -59,34 +61,43 @@ impl PipelineConfig {
5961
}
6062
}
6163

62-
pub fn compile(
64+
pub async fn compile(
6365
self,
64-
js_runtime: &JsRuntime,
66+
js_runtime: &mut JsRuntime,
6567
config_dir: &Path,
6668
source: Utf8PathBuf,
6769
) -> Result<Pipeline, ConfigError> {
68-
let input = topic_filters(&self.input_topics)?;
69-
let stages = self
70-
.stages
71-
.into_iter()
72-
.map(|stage| stage.compile(js_runtime, config_dir))
73-
.collect::<Result<Vec<_>, _>>()?;
70+
let input_topics = topic_filters(&self.input_topics)?;
71+
let mut stages = vec![];
72+
for (i, stage) in self.stages.into_iter().enumerate() {
73+
let stage = stage.compile(config_dir, i, &source).await?;
74+
let filter = &stage.filter;
75+
js_runtime
76+
.load_file(filter.module_name(), filter.path())
77+
.await?;
78+
stages.push(stage);
79+
}
7480
Ok(Pipeline {
75-
input_topics: input,
81+
input_topics,
7682
stages,
7783
source,
7884
})
7985
}
8086
}
8187

8288
impl StageConfig {
83-
pub fn compile(self, _js_runtime: &JsRuntime, config_dir: &Path) -> Result<Stage, ConfigError> {
89+
pub async fn compile(
90+
self,
91+
config_dir: &Path,
92+
index: usize,
93+
pipeline: &Utf8Path,
94+
) -> Result<Stage, ConfigError> {
8495
let path = match self.filter {
8596
FilterSpec::JavaScript(path) if path.is_absolute() => path.into(),
8697
FilterSpec::JavaScript(path) if path.starts_with(config_dir) => path.into(),
8798
FilterSpec::JavaScript(path) => config_dir.join(path),
8899
};
89-
let filter = JsFilter::new(path)
100+
let filter = JsFilter::new(pipeline.to_owned().into(), index, path)
90101
.with_config(self.config)
91102
.with_tick_every_seconds(self.tick_every_seconds);
92103
let config_topics = topic_filters(&self.meta_topics)?;

crates/extensions/tedge_gen_mapper/src/js_filter.rs

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ use tracing::debug;
1414

1515
#[derive(Clone)]
1616
pub struct JsFilter {
17+
pub module_name: String,
1718
pub path: PathBuf,
1819
pub config: JsonValue,
1920
pub tick_every_seconds: u64,
@@ -23,16 +24,18 @@ pub struct JsFilter {
2324
pub struct JsonValue(serde_json::Value);
2425

2526
impl JsFilter {
26-
pub fn new(path: PathBuf) -> Self {
27+
pub fn new(pipeline: PathBuf, index: usize, path: PathBuf) -> Self {
28+
let module_name = format!("{}|{}|{}", pipeline.display(), index, path.display());
2729
JsFilter {
30+
module_name,
2831
path,
2932
config: JsonValue::default(),
3033
tick_every_seconds: 0,
3134
}
3235
}
3336

3437
pub fn module_name(&self) -> String {
35-
self.path.display().to_string()
38+
self.module_name.to_owned()
3639
}
3740

3841
pub fn with_config(self, config: Option<serde_json::Value>) -> Self {
@@ -77,7 +80,7 @@ impl JsFilter {
7780
message.clone().into(),
7881
self.config.clone(),
7982
];
80-
js.call_function(&self.path, "process", input)
83+
js.call_function(&self.module_name(), "process", input)
8184
.await
8285
.map_err(pipeline::error_from_js)?
8386
.try_into()
@@ -98,7 +101,7 @@ impl JsFilter {
98101
debug!(target: "MAPPING", "{}: update_config({message:?})", self.module_name());
99102
let input = vec![message.clone().into(), self.config.clone()];
100103
let config = js
101-
.call_function(&self.path, "update_config", input)
104+
.call_function(&self.module_name(), "update_config", input)
102105
.await
103106
.map_err(pipeline::error_from_js)?;
104107
self.config = config;
@@ -122,7 +125,7 @@ impl JsFilter {
122125
}
123126
debug!(target: "MAPPING", "{}: tick({timestamp:?})", self.module_name());
124127
let input = vec![timestamp.clone().into(), self.config.clone()];
125-
js.call_function(&self.path, "tick", input)
128+
js.call_function(&self.module_name(), "tick", input)
126129
.await
127130
.map_err(pipeline::error_from_js)?
128131
.try_into()
@@ -281,8 +284,8 @@ mod tests {
281284
async fn identity_filter() {
282285
let script = "export function process(t,msg) { return [msg]; };";
283286
let mut runtime = JsRuntime::try_new().await.unwrap();
284-
runtime.load_js("id.js", script).await.unwrap();
285-
let filter = JsFilter::new("id.js".into());
287+
let filter = JsFilter::new("id.toml".into(), 1, "id.js".into());
288+
runtime.load_js(filter.module_name(), script).await.unwrap();
286289

287290
let input = Message::new("te/main/device///m/", "hello world");
288291
let output = input.clone();
@@ -299,8 +302,8 @@ mod tests {
299302
async fn error_filter() {
300303
let script = r#"export function process(t,msg) { throw new Error("Cannot process that message"); };"#;
301304
let mut runtime = JsRuntime::try_new().await.unwrap();
302-
runtime.load_js("err.js", script).await.unwrap();
303-
let filter = JsFilter::new("err.js".into());
305+
let filter = JsFilter::new("err.toml".into(), 1, "err.js".into());
306+
runtime.load_js(filter.module_name(), script).await.unwrap();
304307

305308
let input = Message::new("te/main/device///m/", "hello world");
306309
let error = filter
@@ -335,8 +338,8 @@ export function process (timestamp, message, config) {
335338
}
336339
"#;
337340
let mut runtime = JsRuntime::try_new().await.unwrap();
338-
runtime.load_js("collectd.js", script).await.unwrap();
339-
let filter = JsFilter::new("collectd.js".into());
341+
let filter = JsFilter::new("collectd.toml".into(), 1, "collectd.js".into());
342+
runtime.load_js(filter.module_name(), script).await.unwrap();
340343

341344
let input = Message::new(
342345
"collectd/h/memory/percent-used",

crates/extensions/tedge_gen_mapper/src/js_runtime.rs

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,20 +23,22 @@ impl JsRuntime {
2323
Ok(JsRuntime { runtime, worker })
2424
}
2525

26-
pub async fn load_file(&mut self, path: impl AsRef<Path>) -> Result<(), LoadError> {
26+
pub async fn load_file(
27+
&mut self,
28+
module_name: String,
29+
path: impl AsRef<Path>,
30+
) -> Result<(), LoadError> {
2731
let path = path.as_ref();
2832
let source = tokio::fs::read_to_string(path).await?;
29-
self.load_js(path, source).await
33+
self.load_js(module_name, source).await
3034
}
3135

3236
pub async fn load_js(
3337
&mut self,
34-
path: impl AsRef<Path>,
38+
name: String,
3539
source: impl Into<Vec<u8>>,
3640
) -> Result<(), LoadError> {
3741
let (sender, receiver) = oneshot::channel();
38-
let path = path.as_ref().to_path_buf();
39-
let name = path.display().to_string();
4042
let source = source.into();
4143
self.worker
4244
.send(JsRequest::LoadModule {
@@ -51,14 +53,14 @@ impl JsRuntime {
5153

5254
pub async fn call_function(
5355
&self,
54-
module: &Path,
56+
module: &str,
5557
function: &str,
5658
args: Vec<JsonValue>,
5759
) -> Result<JsonValue, LoadError> {
5860
let (sender, receiver) = oneshot::channel();
5961
self.worker
6062
.send(JsRequest::CallFunction {
61-
module: module.display().to_string(),
63+
module: module.to_string(),
6264
function: function.to_string(),
6365
args,
6466
sender,

0 commit comments

Comments
 (0)