Skip to content

Commit 6c15dfa

Browse files
committed
collect message transformation stats
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 073e2f6 commit 6c15dfa

File tree

6 files changed

+270
-3
lines changed

6 files changed

+270
-3
lines changed

crates/extensions/tedge_gen_mapper/src/actor.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,7 @@ impl GenMapper {
133133
let timestamp = DateTime::now();
134134
if timestamp.seconds % 300 == 0 {
135135
self.processor.dump_memory_stats().await;
136+
self.processor.dump_processing_stats().await;
136137
}
137138
for (pipeline_id, pipeline_messages) in self.processor.tick(&timestamp).await {
138139
match pipeline_messages {

crates/extensions/tedge_gen_mapper/src/js_filter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,10 @@ impl JsFilter {
7272
&self.path
7373
}
7474

75+
pub fn source(&self) -> String {
76+
format!("{}", self.path.display())
77+
}
78+
7579
/// Process a message returning zero, one or more messages
7680
///
7781
/// The "process" function of the JS module is passed 3 arguments

crates/extensions/tedge_gen_mapper/src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ mod js_filter;
44
mod js_runtime;
55
pub mod pipeline;
66
mod runtime;
7+
mod stats;
78

89
use crate::actor::GenMapper;
910
pub use crate::runtime::MessageProcessor;

crates/extensions/tedge_gen_mapper/src/pipeline.rs

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use crate::js_filter::JsFilter;
22
use crate::js_runtime::JsRuntime;
3+
use crate::stats::Counter;
34
use crate::LoadError;
45
use camino::Utf8Path;
56
use camino::Utf8PathBuf;
@@ -76,6 +77,7 @@ impl Pipeline {
7677
pub async fn process(
7778
&mut self,
7879
js_runtime: &JsRuntime,
80+
stats: &mut Counter,
7981
timestamp: &DateTime,
8082
message: &Message,
8183
) -> Result<Vec<Message>, FilterError> {
@@ -84,38 +86,66 @@ impl Pipeline {
8486
return Ok(vec![]);
8587
}
8688

89+
let stated_at = stats.pipeline_process_start(self.source.as_str());
8790
let mut messages = vec![message.clone()];
8891
for stage in self.stages.iter() {
92+
let js = stage.filter.source();
8993
let mut transformed_messages = vec![];
9094
for message in messages.iter() {
95+
let filter_started_at = stats.filter_start(&js, "process");
9196
let filter_output = stage.filter.process(js_runtime, timestamp, message).await;
97+
match &filter_output {
98+
Ok(messages) => {
99+
stats.filter_done(&js, "process", filter_started_at, messages.len())
100+
}
101+
Err(_) => stats.filter_failed(&js, "process"),
102+
}
92103
transformed_messages.extend(filter_output?);
93104
}
94105
messages = transformed_messages;
95106
}
107+
108+
stats.pipeline_process_done(self.source.as_str(), stated_at, messages.len());
96109
Ok(messages)
97110
}
98111

99112
pub async fn tick(
100113
&mut self,
101114
js_runtime: &JsRuntime,
115+
stats: &mut Counter,
102116
timestamp: &DateTime,
103117
) -> Result<Vec<Message>, FilterError> {
118+
let stated_at = stats.pipeline_tick_start(self.source.as_str());
104119
let mut messages = vec![];
105120
for stage in self.stages.iter() {
121+
let js = stage.filter.source();
106122
// Process first the messages triggered upstream by the tick
107123
let mut transformed_messages = vec![];
108124
for message in messages.iter() {
125+
let filter_started_at = stats.filter_start(&js, "process");
109126
let filter_output = stage.filter.process(js_runtime, timestamp, message).await;
127+
match &filter_output {
128+
Ok(messages) => {
129+
stats.filter_done(&js, "process", filter_started_at, messages.len())
130+
}
131+
Err(_) => stats.filter_failed(&js, "process"),
132+
}
110133
transformed_messages.extend(filter_output?);
111134
}
112135

113136
// Only then process the tick
114-
transformed_messages.extend(stage.filter.tick(js_runtime, timestamp).await?);
137+
let filter_started_at = stats.filter_start(&js, "tick");
138+
let tick_output = stage.filter.tick(js_runtime, timestamp).await;
139+
match &tick_output {
140+
Ok(messages) => stats.filter_done(&js, "tick", filter_started_at, messages.len()),
141+
Err(_) => stats.filter_failed(&js, "tick"),
142+
}
143+
transformed_messages.extend(tick_output?);
115144

116145
// Iterate with all the messages collected at this stage
117146
messages = transformed_messages;
118147
}
148+
stats.pipeline_tick_done(self.source.as_str(), stated_at, messages.len());
119149
Ok(messages)
120150
}
121151
}

crates/extensions/tedge_gen_mapper/src/runtime.rs

Lines changed: 30 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use crate::pipeline::DateTime;
44
use crate::pipeline::FilterError;
55
use crate::pipeline::Message;
66
use crate::pipeline::Pipeline;
7+
use crate::stats::Counter;
78
use crate::LoadError;
89
use camino::Utf8Path;
910
use camino::Utf8PathBuf;
@@ -21,6 +22,7 @@ pub struct MessageProcessor {
2122
pub config_dir: PathBuf,
2223
pub pipelines: HashMap<String, Pipeline>,
2324
pub(super) js_runtime: JsRuntime,
25+
pub stats: Counter,
2426
}
2527

2628
impl MessageProcessor {
@@ -34,11 +36,13 @@ impl MessageProcessor {
3436
let mut pipeline_specs = PipelineSpecs::default();
3537
pipeline_specs.load(&config_dir).await;
3638
let pipelines = pipeline_specs.compile(&mut js_runtime, &config_dir).await;
39+
let stats = Counter::default();
3740

3841
Ok(MessageProcessor {
3942
config_dir,
4043
pipelines,
4144
js_runtime,
45+
stats,
4246
})
4347
}
4448

@@ -52,10 +56,13 @@ impl MessageProcessor {
5256
let mut pipeline_specs = PipelineSpecs::default();
5357
pipeline_specs.load_single_pipeline(&pipeline).await;
5458
let pipelines = pipeline_specs.compile(&mut js_runtime, &config_dir).await;
59+
let stats = Counter::default();
60+
5561
Ok(MessageProcessor {
5662
config_dir,
5763
pipelines,
5864
js_runtime,
65+
stats,
5966
})
6067
}
6168

@@ -68,10 +75,13 @@ impl MessageProcessor {
6875
let mut pipeline_specs = PipelineSpecs::default();
6976
pipeline_specs.load_single_filter(&filter).await;
7077
let pipelines = pipeline_specs.compile(&mut js_runtime, &config_dir).await;
78+
let stats = Counter::default();
79+
7180
Ok(MessageProcessor {
7281
config_dir,
7382
pipelines,
7483
js_runtime,
84+
stats,
7585
})
7686
}
7787

@@ -88,11 +98,20 @@ impl MessageProcessor {
8898
timestamp: &DateTime,
8999
message: &Message,
90100
) -> Vec<(String, Result<Vec<Message>, FilterError>)> {
101+
let started_at = self.stats.runtime_process_start();
102+
91103
let mut out_messages = vec![];
92104
for (pipeline_id, pipeline) in self.pipelines.iter_mut() {
93-
let pipeline_output = pipeline.process(&self.js_runtime, timestamp, message).await;
105+
let pipeline_output = pipeline
106+
.process(&self.js_runtime, &mut self.stats, timestamp, message)
107+
.await;
108+
if pipeline_output.is_err() {
109+
self.stats.pipeline_process_failed(pipeline_id);
110+
}
94111
out_messages.push((pipeline_id.clone(), pipeline_output));
95112
}
113+
114+
self.stats.runtime_process_done(started_at);
96115
out_messages
97116
}
98117

@@ -102,12 +121,21 @@ impl MessageProcessor {
102121
) -> Vec<(String, Result<Vec<Message>, FilterError>)> {
103122
let mut out_messages = vec![];
104123
for (pipeline_id, pipeline) in self.pipelines.iter_mut() {
105-
let pipeline_output = pipeline.tick(&self.js_runtime, timestamp).await;
124+
let pipeline_output = pipeline
125+
.tick(&self.js_runtime, &mut self.stats, timestamp)
126+
.await;
127+
if pipeline_output.is_err() {
128+
self.stats.pipeline_tick_failed(pipeline_id);
129+
}
106130
out_messages.push((pipeline_id.clone(), pipeline_output));
107131
}
108132
out_messages
109133
}
110134

135+
pub async fn dump_processing_stats(&self) {
136+
self.stats.dump_processing_stats();
137+
}
138+
111139
pub async fn dump_memory_stats(&self) {
112140
self.js_runtime.dump_memory_stats().await;
113141
}

0 commit comments

Comments
 (0)