Skip to content

Commit 5c16cfc

Browse files
committed
Fix memory leak
The first design was naive (i.e. loading the javascript module for each and every function call) and leading to a memory leak (loading a module with the same name keeps the previous version of the module in memory). The new design fixes the issue. The memory is stable while translating to c8y 600 tedge measurements per second during 20 minutes (mode debug) ```shell $ watch ps -p 1382888 -o args,%cpu,etimes,times,%mem,rss,vsz COMMAND %CPU ELAPSED TIME %MEM RSS VSZ tedge-mapper gen 47.4 1322 627 0.0 41440 1359476 $ tedge mqtt sub 'test/output' --duration 1000 | pv | wc -l 89.3MiB 0:16:40 [91.4KiB/s] [ <=> ] 586311 ``` Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 9773702 commit 5c16cfc

File tree

8 files changed

+309
-137
lines changed

8 files changed

+309
-137
lines changed

crates/extensions/tedge_gen_mapper/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,15 @@ repository.workspace = true
1212
anyhow = { workspace = true }
1313
async-trait = { workspace = true }
1414
camino = { workspace = true, features = ["serde1"] }
15-
rquickjs = { workspace = true, features = ["futures", "parallel"] }
15+
rquickjs = { workspace = true, default-features = false, features = ["futures", "parallel"] }
1616
serde = { workspace = true, features = ["derive"] }
1717
serde_json = { workspace = true }
1818
tedge_actors = { workspace = true }
1919
tedge_file_system_ext = { workspace = true }
2020
tedge_mqtt_ext = { workspace = true }
2121
thiserror = { workspace = true }
2222
time = { workspace = true }
23-
tokio = { workspace = true, features = ["fs", "macros", "time"] }
23+
tokio = { workspace = true, features = ["fs", "macros", "time", "sync"] }
2424
toml = { workspace = true, features = ["parse"] }
2525
tracing = { workspace = true }
2626

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
export function process (timestamp, message, config) {
2+
return [{
3+
topic: config?.topic || "te/error",
4+
payload: message.payload
5+
}]
6+
}

crates/extensions/tedge_gen_mapper/src/actor.rs

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use crate::config::PipelineConfig;
2-
use crate::js_filter::JsRuntime;
2+
use crate::js_runtime::JsRuntime;
33
use crate::pipeline::DateTime;
44
use crate::pipeline::Message;
55
use crate::pipeline::Pipeline;
@@ -83,9 +83,8 @@ impl GenMapper {
8383
for stage in &mut pipeline.stages {
8484
if stage.filter.path() == path {
8585
match self.js_runtime.load_file(&path).await {
86-
Ok(filter) => {
86+
Ok(()) => {
8787
info!("Reloaded filter {path}");
88-
stage.filter = filter
8988
}
9089
Err(e) => {
9190
error!("Failed to reload filter {path}: {e}");
@@ -172,6 +171,9 @@ impl GenMapper {
172171

173172
async fn tick(&mut self) -> Result<(), RuntimeError> {
174173
let timestamp = DateTime::now();
174+
if timestamp.seconds % 300 == 0 {
175+
self.js_runtime.dump_memory_stats().await;
176+
}
175177
for (pipeline_id, pipeline) in self.pipelines.iter_mut() {
176178
match pipeline.tick(&self.js_runtime, &timestamp).await {
177179
Ok(messages) => {

crates/extensions/tedge_gen_mapper/src/config.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
1-
use crate::js_filter::JsRuntime;
1+
use crate::js_filter::JsFilter;
2+
use crate::js_runtime::JsRuntime;
23
use crate::pipeline::Pipeline;
34
use crate::pipeline::Stage;
45
use crate::LoadError;
@@ -65,13 +66,12 @@ impl PipelineConfig {
6566
}
6667

6768
impl StageConfig {
68-
pub fn compile(self, js_runtime: &JsRuntime, config_dir: &Path) -> Result<Stage, ConfigError> {
69+
pub fn compile(self, _js_runtime: &JsRuntime, config_dir: &Path) -> Result<Stage, ConfigError> {
6970
let path = match self.filter {
7071
FilterSpec::JavaScript(path) if path.is_absolute() => path.into(),
7172
FilterSpec::JavaScript(path) => config_dir.join(path),
7273
};
73-
let filter = js_runtime
74-
.loaded_module(path)?
74+
let filter = JsFilter::new(path)
7575
.with_config(self.config)
7676
.with_tick_every_seconds(self.tick_every_seconds);
7777
let config_topics = topic_filters(&self.meta_topics)?;

crates/extensions/tedge_gen_mapper/src/js_filter.rs

Lines changed: 70 additions & 124 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
1+
use crate::js_runtime::JsRuntime;
12
use crate::pipeline;
23
use crate::pipeline::DateTime;
34
use crate::pipeline::FilterError;
45
use crate::pipeline::Message;
5-
use crate::LoadError;
6+
use anyhow::Context;
67
use rquickjs::Ctx;
78
use rquickjs::FromJs;
89
use rquickjs::IntoJs;
9-
use rquickjs::Object;
1010
use rquickjs::Value;
11-
use std::collections::HashMap;
1211
use std::path::Path;
1312
use std::path::PathBuf;
1413
use tracing::debug;
@@ -20,7 +19,7 @@ pub struct JsFilter {
2019
tick_every_seconds: u64,
2120
}
2221

23-
#[derive(Clone, Default)]
22+
#[derive(Clone, Debug, Default)]
2423
pub struct JsonValue(serde_json::Value);
2524

2625
impl JsFilter {
@@ -32,6 +31,10 @@ impl JsFilter {
3231
}
3332
}
3433

34+
pub fn module_name(&self) -> String {
35+
self.path.display().to_string()
36+
}
37+
3538
pub fn with_config(self, config: Option<serde_json::Value>) -> Self {
3639
if let Some(config) = config {
3740
Self {
@@ -68,11 +71,16 @@ impl JsFilter {
6871
timestamp: &DateTime,
6972
message: &Message,
7073
) -> Result<Vec<Message>, FilterError> {
71-
debug!(target: "MAPPING", "{}: process({timestamp:?}, {message:?})", self.path.display());
72-
let input = (timestamp.clone(), message.clone(), self.config.clone());
73-
js.call_function(self, "process", input)
74+
debug!(target: "MAPPING", "{}: process({timestamp:?}, {message:?})", self.module_name());
75+
let input = vec![
76+
timestamp.clone().into(),
77+
message.clone().into(),
78+
self.config.clone(),
79+
];
80+
js.call_function(&self.path, "process", input)
7481
.await
75-
.map_err(pipeline::error_from_js)
82+
.map_err(pipeline::error_from_js)?
83+
.try_into()
7684
}
7785

7886
/// Update the filter config using a metadata message
@@ -87,10 +95,10 @@ impl JsFilter {
8795
js: &JsRuntime,
8896
message: &Message,
8997
) -> Result<(), FilterError> {
90-
debug!(target: "MAPPING", "{}: update_config({message:?})", self.path.display());
91-
let input = (message.clone(), self.config.clone());
98+
debug!(target: "MAPPING", "{}: update_config({message:?})", self.module_name());
99+
let input = vec![message.clone().into(), self.config.clone()];
92100
let config = js
93-
.call_function(self, "update_config", input)
101+
.call_function(&self.path, "update_config", input)
94102
.await
95103
.map_err(pipeline::error_from_js)?;
96104
self.config = config;
@@ -112,143 +120,78 @@ impl JsFilter {
112120
if !timestamp.tick_now(self.tick_every_seconds) {
113121
return Ok(vec![]);
114122
}
115-
debug!(target: "MAPPING", "{}: tick({timestamp:?})", self.path.display());
116-
let input = (timestamp.clone(), self.config.clone());
117-
js.call_function(self, "tick", input)
123+
debug!(target: "MAPPING", "{}: tick({timestamp:?})", self.module_name());
124+
let input = vec![timestamp.clone().into(), self.config.clone()];
125+
js.call_function(&self.path, "tick", input)
118126
.await
119-
.map_err(pipeline::error_from_js)
127+
.map_err(pipeline::error_from_js)?
128+
.try_into()
120129
}
121130
}
122131

123-
pub struct JsRuntime {
124-
context: rquickjs::AsyncContext,
125-
modules: HashMap<PathBuf, Vec<u8>>,
132+
impl From<Message> for JsonValue {
133+
fn from(value: Message) -> Self {
134+
JsonValue(value.json())
135+
}
126136
}
127137

128-
impl JsRuntime {
129-
pub async fn try_new() -> Result<Self, LoadError> {
130-
let runtime = rquickjs::AsyncRuntime::new()?;
131-
let context = rquickjs::AsyncContext::full(&runtime).await?;
132-
let modules = HashMap::new();
133-
Ok(JsRuntime { context, modules })
138+
impl From<DateTime> for JsonValue {
139+
fn from(value: DateTime) -> Self {
140+
JsonValue(value.json())
134141
}
142+
}
135143

136-
pub async fn load_file(&mut self, path: impl AsRef<Path>) -> Result<JsFilter, LoadError> {
137-
let path = path.as_ref();
138-
let source = tokio::fs::read_to_string(path).await?;
139-
self.load_js(path, source)
140-
}
144+
impl TryFrom<serde_json::Value> for Message {
145+
type Error = FilterError;
141146

142-
pub fn load_js(
143-
&mut self,
144-
path: impl AsRef<Path>,
145-
source: impl Into<Vec<u8>>,
146-
) -> Result<JsFilter, LoadError> {
147-
let path = path.as_ref().to_path_buf();
148-
self.modules.insert(path.clone(), source.into());
149-
Ok(JsFilter::new(path))
147+
fn try_from(value: serde_json::Value) -> Result<Self, Self::Error> {
148+
let message = serde_json::from_value(value)
149+
.with_context(|| "Couldn't extract message payload and topic")?;
150+
Ok(message)
150151
}
152+
}
151153

152-
pub fn loaded_module(&self, path: PathBuf) -> Result<JsFilter, LoadError> {
153-
match self.modules.get(&path) {
154-
None => Err(LoadError::ScriptNotLoaded { path }),
155-
Some(_) => Ok(JsFilter::new(path)),
156-
}
157-
}
154+
impl TryFrom<JsonValue> for Message {
155+
type Error = FilterError;
158156

159-
pub async fn call_function<Args, Ret>(
160-
&self,
161-
module: &JsFilter,
162-
function: &str,
163-
args: Args,
164-
) -> Result<Ret, LoadError>
165-
where
166-
for<'a> Args: rquickjs::function::IntoArgs<'a> + Send + 'a,
167-
for<'a> Ret: FromJs<'a> + Send + 'a,
168-
{
169-
let Some(source) = self.modules.get(&module.path) else {
170-
return Err(LoadError::ScriptNotLoaded {
171-
path: module.path.clone(),
172-
});
173-
};
174-
175-
let name = module.path.display().to_string();
176-
177-
rquickjs::async_with!(self.context => |ctx| {
178-
debug!(target: "MAPPING", "compile({name})");
179-
let m = rquickjs::Module::declare(ctx.clone(), name.clone(), source.clone())?;
180-
let (m,p) = m.eval()?;
181-
let () = p.finish()?;
182-
183-
debug!(target: "MAPPING", "link({name})");
184-
let f: rquickjs::Value = m.get(function)?;
185-
let f = rquickjs::Function::from_value(f)?;
186-
187-
debug!(target: "MAPPING", "execute({name})");
188-
let r = f.call(args);
189-
if r.is_err() {
190-
if let Some(ex) = ctx.catch().as_exception() {
191-
let err = anyhow::anyhow!("{ex}");
192-
Err(err.context("JS raised exception").into())
193-
} else {
194-
let err = r.err().unwrap();
195-
debug!(target: "MAPPING", "execute({name}) => {err:?}");
196-
Err(err.into())
197-
}
198-
} else {
199-
Ok(r.unwrap())
200-
}
201-
})
202-
.await
157+
fn try_from(value: JsonValue) -> Result<Self, Self::Error> {
158+
Message::try_from(value.0)
203159
}
204160
}
205161

206-
impl<'js> FromJs<'js> for Message {
207-
fn from_js(_ctx: &Ctx<'js>, value: Value<'js>) -> rquickjs::Result<Self> {
208-
debug!(target: "MAPPING", "from_js(...)");
209-
match value.as_object() {
210-
None => Ok(Message {
211-
topic: "".to_string(),
212-
payload: "".to_string(),
213-
}),
214-
Some(object) => {
215-
let topic = object.get("topic");
216-
let payload = object.get("payload");
217-
debug!(target: "MAPPING", "from_js(...) -> topic = {:?}, payload = {:?}", topic, payload);
218-
Ok(Message {
219-
topic: topic?,
220-
payload: payload?,
221-
})
162+
impl TryFrom<JsonValue> for Vec<Message> {
163+
type Error = FilterError;
164+
165+
fn try_from(value: JsonValue) -> Result<Self, Self::Error> {
166+
match value.0 {
167+
serde_json::Value::Array(array) => array.into_iter().map(Message::try_from).collect(),
168+
serde_json::Value::Object(map) => {
169+
Message::try_from(serde_json::Value::Object(map)).map(|message| vec![message])
222170
}
171+
_ => Err(anyhow::anyhow!("Filters are expected to return an array of messages").into()),
223172
}
224173
}
225174
}
226175

227-
impl<'js> IntoJs<'js> for Message {
176+
struct JsonValueRef<'a>(&'a serde_json::Value);
177+
178+
impl<'js> IntoJs<'js> for JsonValue {
228179
fn into_js(self, ctx: &Ctx<'js>) -> rquickjs::Result<Value<'js>> {
229-
debug!(target: "MAPPING", "into_js({self:?})");
230-
let msg = Object::new(ctx.clone())?;
231-
msg.set("topic", self.topic)?;
232-
msg.set("payload", self.payload)?;
233-
Ok(Value::from_object(msg))
180+
JsonValueRef(&self.0).into_js(ctx)
234181
}
235182
}
236183

237-
impl<'js> IntoJs<'js> for DateTime {
184+
impl<'js> IntoJs<'js> for &JsonValue {
238185
fn into_js(self, ctx: &Ctx<'js>) -> rquickjs::Result<Value<'js>> {
239-
debug!(target: "MAPPING", "into_js({self:?})");
240-
let msg = Object::new(ctx.clone())?;
241-
msg.set("seconds", self.seconds)?;
242-
msg.set("nanoseconds", self.nanoseconds)?;
243-
Ok(Value::from_object(msg))
186+
JsonValueRef(&self.0).into_js(ctx)
244187
}
245188
}
246189

247-
impl<'js> IntoJs<'js> for JsonValue {
190+
impl<'a, 'js> IntoJs<'js> for JsonValueRef<'a> {
248191
fn into_js(self, ctx: &Ctx<'js>) -> rquickjs::Result<Value<'js>> {
249192
match self.0 {
250193
serde_json::Value::Null => Ok(Value::new_null(ctx.clone())),
251-
serde_json::Value::Bool(value) => Ok(Value::new_bool(ctx.clone(), value)),
194+
serde_json::Value::Bool(value) => Ok(Value::new_bool(ctx.clone(), *value)),
252195
serde_json::Value::Number(value) => {
253196
if let Some(n) = value.as_i64() {
254197
if let Ok(n) = i32::try_from(n) {
@@ -262,20 +205,20 @@ impl<'js> IntoJs<'js> for JsonValue {
262205
Ok(nan.into_value())
263206
}
264207
serde_json::Value::String(value) => {
265-
let string = rquickjs::String::from_str(ctx.clone(), &value)?;
208+
let string = rquickjs::String::from_str(ctx.clone(), value)?;
266209
Ok(string.into_value())
267210
}
268211
serde_json::Value::Array(values) => {
269212
let array = rquickjs::Array::new(ctx.clone())?;
270-
for (i, value) in values.into_iter().enumerate() {
271-
array.set(i, JsonValue(value))?;
213+
for (i, value) in values.iter().enumerate() {
214+
array.set(i, JsonValueRef(value))?;
272215
}
273216
Ok(array.into_value())
274217
}
275218
serde_json::Value::Object(values) => {
276219
let object = rquickjs::Object::new(ctx.clone())?;
277220
for (key, value) in values.into_iter() {
278-
object.set(key, JsonValue(value))?;
221+
object.set(key, JsonValueRef(value))?;
279222
}
280223
Ok(object.into_value())
281224
}
@@ -327,7 +270,8 @@ mod tests {
327270
async fn identity_filter() {
328271
let script = "export function process(t,msg) { return [msg]; };";
329272
let mut runtime = JsRuntime::try_new().await.unwrap();
330-
let filter = runtime.load_js("id.js", script).unwrap();
273+
runtime.load_js("id.js", script).await.unwrap();
274+
let filter = JsFilter::new("id.js".into());
331275

332276
let input = Message::new("te/main/device///m/", "hello world");
333277
let output = input.clone();
@@ -344,7 +288,8 @@ mod tests {
344288
async fn error_filter() {
345289
let script = r#"export function process(t,msg) { throw new Error("Cannot process that message"); };"#;
346290
let mut runtime = JsRuntime::try_new().await.unwrap();
347-
let filter = runtime.load_js("err.js", script).unwrap();
291+
runtime.load_js("err.js", script).await.unwrap();
292+
let filter = JsFilter::new("err.js".into());
348293

349294
let input = Message::new("te/main/device///m/", "hello world");
350295
let error = filter
@@ -379,7 +324,8 @@ export function process (timestamp, message, config) {
379324
}
380325
"#;
381326
let mut runtime = JsRuntime::try_new().await.unwrap();
382-
let filter = runtime.load_js("collectd.js", script).unwrap();
327+
runtime.load_js("collectd.js", script).await.unwrap();
328+
let filter = JsFilter::new("collectd.js".into());
383329

384330
let input = Message::new(
385331
"collectd/h/memory/percent-used",

0 commit comments

Comments
 (0)