Skip to content

Commit fc01710

Browse files
committed
feat(plugin): netdata collector of thin-edge measurements
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent 08577c9 commit fc01710

File tree

9 files changed

+442
-1
lines changed

9 files changed

+442
-1
lines changed

Cargo.lock

Lines changed: 101 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ members = [
66
"crates/tests/*",
77
"plugins/c8y_firmware_plugin",
88
"plugins/c8y_remote_access_plugin",
9+
"plugins/netdata-collector",
910
"plugins/tedge_apt_plugin",
1011
]
1112
resolver = "2"
Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,21 @@
1+
[package]
2+
name = "netdata_collector"
3+
version.workspace = true
4+
authors.workspace = true
5+
edition.workspace = true
6+
rust-version.workspace = true
7+
license.workspace = true
8+
homepage.workspace = true
9+
repository.workspace = true
10+
11+
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
12+
13+
[dependencies]
14+
async-trait = { workspace = true }
15+
netdata-plugin = "0.2.0"
16+
tedge_actors = { workspace = true }
17+
tedge_api = { workspace = true }
18+
time = { workspace = true }
19+
20+
[lints]
21+
workspace = true
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
use crate::message::MetricPoints;
2+
use crate::TEdgeNetDataCollectorBuilder;
3+
use netdata_plugin::collector::Collector;
4+
use netdata_plugin::Chart;
5+
use netdata_plugin::Dimension;
6+
use std::collections::HashMap;
7+
use std::collections::HashSet;
8+
use tedge_actors::Actor;
9+
use tedge_actors::LoggingReceiver;
10+
use tedge_actors::MessageReceiver;
11+
use tedge_actors::RuntimeError;
12+
13+
pub struct TEdgeNetDataCollector {
14+
pub(crate) input: LoggingReceiver<MetricPoints>,
15+
}
16+
17+
impl TEdgeNetDataCollector {
18+
pub fn builder() -> TEdgeNetDataCollectorBuilder {
19+
TEdgeNetDataCollectorBuilder::default()
20+
}
21+
}
22+
23+
#[async_trait::async_trait]
24+
impl Actor for TEdgeNetDataCollector {
25+
fn name(&self) -> &str {
26+
"NetData"
27+
}
28+
29+
async fn run(mut self) -> Result<(), RuntimeError> {
30+
let mut writer = std::io::stdout();
31+
let mut c = Collector::new(&mut writer);
32+
let mut charts = HashMap::new();
33+
34+
while let Some(points) = self.input.recv().await {
35+
// Declare any new chart
36+
let updated_charts: HashSet<String> =
37+
points.iter().map(|p| p.chart_id.clone()).collect();
38+
for chart_id in updated_charts.iter() {
39+
if !charts.contains_key(chart_id) {
40+
let chart = new_chart(chart_id);
41+
c.add_chart(&chart).unwrap();
42+
charts.insert(chart_id.to_string(), HashSet::new());
43+
}
44+
}
45+
46+
// Declare any new dimension
47+
for p in points.iter() {
48+
if let Some(dims) = charts.get_mut(&p.chart_id) {
49+
let dim_id = p.dimension_id.clone();
50+
if !dims.contains(&dim_id) {
51+
let dim = new_dim(&dim_id);
52+
c.add_dimension(&p.chart_id, &dim).unwrap();
53+
dims.insert(dim_id);
54+
}
55+
}
56+
}
57+
58+
// Publish the metrics
59+
for p in points {
60+
c.prepare_value(&p.chart_id, &p.dimension_id, p.value)
61+
.unwrap();
62+
}
63+
for chart_id in updated_charts {
64+
c.commit_chart(&chart_id).unwrap();
65+
}
66+
}
67+
68+
Ok(())
69+
}
70+
}
71+
72+
fn new_chart(chart_id: &str) -> Chart {
73+
Chart {
74+
type_id: chart_id,
75+
name: chart_id,
76+
title: chart_id,
77+
units: "units",
78+
..Default::default()
79+
}
80+
}
81+
82+
fn new_dim(dim_id: &str) -> Dimension {
83+
Dimension {
84+
id: dim_id,
85+
name: dim_id,
86+
..Default::default()
87+
}
88+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
use crate::MetricPoints;
2+
use crate::TEdgeNetDataCollector;
3+
use std::convert::Infallible;
4+
use tedge_actors::futures::channel::mpsc;
5+
use tedge_actors::Builder;
6+
use tedge_actors::CloneSender;
7+
use tedge_actors::DynSender;
8+
use tedge_actors::LoggingReceiver;
9+
use tedge_actors::MessageSink;
10+
use tedge_actors::RuntimeRequest;
11+
use tedge_actors::RuntimeRequestSink;
12+
13+
pub struct TEdgeNetDataCollectorBuilder {
14+
input: LoggingReceiver<MetricPoints>,
15+
input_sender: DynSender<MetricPoints>,
16+
signal_sender: DynSender<RuntimeRequest>,
17+
}
18+
19+
impl Default for TEdgeNetDataCollectorBuilder {
20+
fn default() -> Self {
21+
let (input_sender, input_receiver) = mpsc::channel(10);
22+
let (signal_sender, signal_receiver) = mpsc::channel(10);
23+
let input = LoggingReceiver::new("NetData".into(), input_receiver, signal_receiver);
24+
25+
TEdgeNetDataCollectorBuilder {
26+
input,
27+
input_sender: input_sender.into(),
28+
signal_sender: signal_sender.into(),
29+
}
30+
}
31+
}
32+
33+
impl MessageSink<MetricPoints> for TEdgeNetDataCollectorBuilder {
34+
fn get_sender(&self) -> DynSender<MetricPoints> {
35+
self.input_sender.sender_clone()
36+
}
37+
}
38+
39+
impl RuntimeRequestSink for TEdgeNetDataCollectorBuilder {
40+
fn get_signal_sender(&self) -> DynSender<RuntimeRequest> {
41+
self.signal_sender.sender_clone()
42+
}
43+
}
44+
45+
impl Builder<TEdgeNetDataCollector> for TEdgeNetDataCollectorBuilder {
46+
type Error = Infallible;
47+
48+
fn try_build(self) -> Result<TEdgeNetDataCollector, Self::Error> {
49+
Ok(TEdgeNetDataCollector { input: self.input })
50+
}
51+
}
Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod actor;
2+
mod builder;
3+
mod message;
4+
5+
pub use actor::*;
6+
pub use builder::*;
7+
pub use message::*;

0 commit comments

Comments
 (0)