Skip to content

Commit 8f525fa

Browse files
committed
Add support for stateful filters
Signed-off-by: Didier Wenzek <didier.wenzek@free.fr>
1 parent d2e7627 commit 8f525fa

File tree

14 files changed

+477
-13
lines changed

14 files changed

+477
-13
lines changed

crates/extensions/tedge_wasm_mapper/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ repository.workspace = true
1010

1111
[dependencies]
1212
async-trait = { workspace = true }
13-
camino = { workspace = true }
13+
camino = { workspace = true, features = ["serde1"] }
1414
serde = { workspace = true, features = ["derive"] }
1515
tedge_actors = { workspace = true }
1616
tedge_mqtt_ext = { workspace = true }

crates/extensions/tedge_wasm_mapper/components/Cargo.lock

Lines changed: 10 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/extensions/tedge_wasm_mapper/components/rust/add_timestamp/Cargo.toml

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,6 @@ edition = "2021"
77
serde_json = { workspace = true }
88
time = { workspace = true }
99
wit-bindgen = { workspace = true }
10-
wit-bindgen-rt = { workspace = true }
1110

1211
[lib]
1312
crate-type = ["cdylib"]
14-
15-

crates/extensions/tedge_wasm_mapper/components/rust/add_timestamp/src/lib.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,10 +46,12 @@ impl Guest for Component {
4646
Ok(vec![updated_message])
4747
}
4848

49+
/// Not configurable
4950
fn update_config(_config: Message) -> Result<(), FilterError> {
5051
Ok(())
5152
}
5253

54+
/// Stateless
5355
fn tick(_timestamp: Datetime) -> Result<Vec<Message>, FilterError> {
5456
Ok(vec![])
5557
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "group_by_timestamp"
3+
version = "0.1.0"
4+
edition = "2021"
5+
6+
[dependencies]
7+
serde = { workspace = true }
8+
serde_json = { workspace = true }
9+
time = { workspace = true }
10+
wit-bindgen = { workspace = true }
11+
12+
[lib]
13+
crate-type = ["cdylib"]
Lines changed: 207 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,207 @@
1+
wit_bindgen::generate!({
2+
world: "tedge",
3+
path: "../../../wit/filter.wit",
4+
});
5+
6+
use crate::exports::tedge::filter::filtering::Datetime;
7+
use crate::exports::tedge::filter::filtering::Filter;
8+
use crate::exports::tedge::filter::filtering::FilterError;
9+
use crate::exports::tedge::filter::filtering::Guest;
10+
use crate::exports::tedge::filter::filtering::GuestFilter;
11+
use crate::exports::tedge::filter::filtering::Message;
12+
use serde::Deserialize;
13+
use serde::Serialize;
14+
use std::cell::RefCell;
15+
use std::collections::HashMap;
16+
use time::format_description::well_known::Rfc3339;
17+
use time::OffsetDateTime;
18+
19+
pub struct Component;
20+
21+
export!(Component);
22+
23+
impl Guest for Component {
24+
type Filter = GroupByTimestamp;
25+
26+
fn new_filter(config: Message) -> Filter {
27+
Filter::new(GroupByTimestamp::new(config))
28+
}
29+
}
30+
31+
/// Group thin-edge measurements occurring the same time-window
32+
///
33+
/// This POC ignores duplicates and simply takes the latest value
34+
pub struct GroupByTimestamp {
35+
time_window_secs: u64,
36+
grouped_messages: RefCell<HashMap<u64, HashMap<Topic, Measurements>>>,
37+
}
38+
39+
type Topic = String;
40+
41+
impl GuestFilter for GroupByTimestamp {
42+
fn new(_config: Message) -> Self {
43+
GroupByTimestamp {
44+
time_window_secs: 60,
45+
grouped_messages: RefCell::new(HashMap::new()),
46+
}
47+
}
48+
49+
fn process(
50+
&self,
51+
_ingestion_timestamp: Datetime,
52+
message: Message,
53+
) -> Result<Vec<Message>, FilterError> {
54+
let Ok(measurements) = serde_json::from_str::<Measurements>(&message.payload) else {
55+
return Err(FilterError::UnsupportedMessage(
56+
"Expect thin-edge measurements".to_string(),
57+
));
58+
};
59+
60+
let Some(measurement_timestamp) = &measurements.time else {
61+
return Err(FilterError::UnsupportedMessage(
62+
"Missing timestamp".to_string(),
63+
));
64+
};
65+
66+
let Ok(unix_timestamp) = OffsetDateTime::parse(measurement_timestamp, &Rfc3339)
67+
.map(|dt| dt.unix_timestamp() as u64)
68+
else {
69+
return Err(FilterError::UnsupportedMessage(
70+
"Expect Rfc3339 timestamp".to_string(),
71+
));
72+
};
73+
74+
let measurement_topic = message.topic.clone();
75+
let time_window_secs = (unix_timestamp / self.time_window_secs) * self.time_window_secs;
76+
self.grouped_messages
77+
.borrow_mut()
78+
.entry(time_window_secs)
79+
.or_default()
80+
.entry(measurement_topic)
81+
.or_default()
82+
.merge(measurements);
83+
84+
Ok(vec![])
85+
}
86+
87+
fn update_config(&self, _config: Message) -> Result<(), FilterError> {
88+
Ok(())
89+
}
90+
91+
fn tick(&self, timestamp: Datetime) -> Result<Vec<Message>, FilterError> {
92+
let time_window_secs = (timestamp.seconds / self.time_window_secs) * self.time_window_secs;
93+
let mut elapsed = vec![];
94+
for t in self.grouped_messages.borrow().keys() {
95+
if *t < time_window_secs {
96+
elapsed.push(*t);
97+
}
98+
}
99+
100+
let mut all_elapsed_messages = vec![];
101+
for t in elapsed {
102+
if let Some(elapsed_groups) = self.grouped_messages.borrow_mut().remove(&t) {
103+
let mut elapsed_messages = GroupByTimestamp::into_messages(t, elapsed_groups)?;
104+
all_elapsed_messages.append(&mut elapsed_messages);
105+
}
106+
}
107+
Ok(all_elapsed_messages)
108+
}
109+
}
110+
111+
impl GroupByTimestamp {
112+
fn into_messages(
113+
unix_timestamp: u64,
114+
group: HashMap<Topic, Measurements>,
115+
) -> Result<Vec<Message>, FilterError> {
116+
let mut elapsed_messages = vec![];
117+
let ingestion_time = time::OffsetDateTime::from_unix_timestamp(unix_timestamp as i64)
118+
.map_err(|err| {
119+
FilterError::IncorrectSetting(format!("Fail to format unix timestamp: {err}"))
120+
})?
121+
.format(&Rfc3339)
122+
.map_err(|err| {
123+
FilterError::IncorrectSetting(format!("Fail to format timestamp as Rfc3339: {err}"))
124+
})?;
125+
126+
for (topic, mut measurements) in group {
127+
measurements.time = Some(ingestion_time.clone());
128+
let payload = serde_json::to_string(&measurements).map_err(|err| {
129+
FilterError::IncorrectSetting(format!("Fail to format measurements as JSON: {err}"))
130+
})?;
131+
elapsed_messages.push(Message { topic, payload })
132+
}
133+
Ok(elapsed_messages)
134+
}
135+
}
136+
137+
#[derive(Deserialize, Serialize, Default)]
138+
struct Measurements {
139+
#[serde(skip_serializing_if = "Option::is_none")]
140+
time: Option<String>,
141+
142+
#[serde(flatten)]
143+
pub extras: HashMap<String, Measurement>,
144+
}
145+
146+
#[derive(Deserialize, Serialize)]
147+
#[serde(untagged)]
148+
enum Measurement {
149+
Number(serde_json::Number),
150+
Text(String),
151+
Group(HashMap<String, serde_json::Number>),
152+
}
153+
154+
impl Measurements {
155+
/// Merge into this set of measurements a new set of measurements
156+
///
157+
/// Returns all the measurements that cannot be merged, if any
158+
pub fn merge(&mut self, other: Measurements) -> Option<Measurements> {
159+
let mut rejected_vals = HashMap::new();
160+
for (k, new_val) in other.extras {
161+
match self.extras.get_mut(k.as_str()) {
162+
None => {
163+
self.extras.insert(k, new_val);
164+
}
165+
Some(old_val) => {
166+
if let Some(rejected_val) = old_val.merge(new_val) {
167+
rejected_vals.insert(k, rejected_val);
168+
}
169+
}
170+
};
171+
}
172+
173+
if rejected_vals.is_empty() {
174+
None
175+
} else {
176+
Some(Measurements {
177+
time: other.time.clone(),
178+
extras: rejected_vals,
179+
})
180+
}
181+
}
182+
}
183+
184+
impl Measurement {
185+
/// Merge this measurement with a new one
186+
///
187+
/// Returns the new measurement if it cannot be merged
188+
pub fn merge(&mut self, other: Measurement) -> Option<Measurement> {
189+
match (self, other) {
190+
(Measurement::Number(ref mut left), Measurement::Number(right)) => {
191+
*left = right;
192+
None
193+
}
194+
(Measurement::Text(ref mut left), Measurement::Text(right)) => {
195+
*left = right;
196+
None
197+
}
198+
(Measurement::Group(ref mut left), Measurement::Group(right)) => {
199+
for (k, v) in right {
200+
left.insert(k, v);
201+
}
202+
None
203+
}
204+
(_, other) => Some(other),
205+
}
206+
}
207+
}

crates/extensions/tedge_wasm_mapper/components/rust/te_to_c8y/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ edition = "2021"
77
serde = { workspace = true }
88
serde_json = { workspace = true }
99
wit-bindgen = { workspace = true }
10-
wit-bindgen-rt = { workspace = true }
1110

1211
[lib]
1312
crate-type = ["cdylib"]

crates/extensions/tedge_wasm_mapper/components/rust/te_to_c8y/src/lib.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,4 @@
11
use serde::Deserialize;
2-
use serde::Serialize;
32
use serde_json::json;
43
use std::collections::HashMap;
54

@@ -96,7 +95,7 @@ impl Guest for Component {
9695
}
9796
}
9897

99-
#[derive(Deserialize, Serialize)]
98+
#[derive(Deserialize)]
10099
struct Measurements {
101100
#[serde(skip_serializing_if = "Option::is_none")]
102101
time: Option<String>,
@@ -105,7 +104,7 @@ struct Measurements {
105104
pub extras: HashMap<String, Measurement>,
106105
}
107106

108-
#[derive(Deserialize, Serialize)]
107+
#[derive(Deserialize)]
109108
#[serde(untagged)]
110109
enum Measurement {
111110
Number(serde_json::Number),

crates/extensions/tedge_wasm_mapper/src/config.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ impl StageConfig {
5454
};
5555
let config_topics = topic_filters(&self.config_topics)?;
5656
Ok(Stage {
57-
filter: Box::new(filter),
57+
filter,
5858
config_topics,
5959
})
6060
}

crates/extensions/tedge_wasm_mapper/src/engine.rs

Lines changed: 33 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,15 @@
1+
use crate::pipeline::Filter;
12
use crate::wasm::Datetime;
23
use crate::wasm::Message;
34
use crate::wasm::TransformedMessages;
45
use crate::wasm::WasmFilter;
6+
use crate::wasm_filter::WasmFilterResource;
57
use crate::LoadError;
68
use camino::Utf8Path;
79
use camino::Utf8PathBuf;
810
use std::collections::HashMap;
11+
use tedge_mqtt_ext::MqttMessage;
12+
use tedge_mqtt_ext::Topic;
913
use wasmtime::component::Component;
1014
use wasmtime::component::Linker;
1115
use wasmtime::component::ResourceTable;
@@ -43,7 +47,19 @@ impl HostEngine {
4347
Ok(())
4448
}
4549

46-
pub fn instantiate(&self, component: &Utf8Path) -> Result<WasmFilter, LoadError> {
50+
pub fn instantiate(&self, component: &Utf8Path) -> Result<Box<dyn Filter>, LoadError> {
51+
self.instantiate_filter(component)
52+
.map(WasmFilter::into_dyn)
53+
.or_else(|_| {
54+
// FIXME: should read the config file
55+
let config_topic = Topic::new_unchecked("config");
56+
let no_config = MqttMessage::new(&config_topic, "");
57+
self.instantiate_resource(component, &no_config)
58+
.map(WasmFilterResource::into_dyn)
59+
})
60+
}
61+
62+
pub fn instantiate_filter(&self, component: &Utf8Path) -> Result<WasmFilter, LoadError> {
4763
let Some(component) = self.components.get(component) else {
4864
return Err(LoadError::FileNotFound {
4965
path: component.into(),
@@ -62,6 +78,22 @@ impl HostEngine {
6278

6379
Ok(WasmFilter::new(store, process_func))
6480
}
81+
82+
pub fn instantiate_resource(
83+
&self,
84+
component: &Utf8Path,
85+
config: &MqttMessage,
86+
) -> Result<WasmFilterResource, LoadError> {
87+
let Some(component) = self.components.get(component) else {
88+
return Err(LoadError::FileNotFound {
89+
path: component.into(),
90+
});
91+
};
92+
93+
let state = HostState::default();
94+
let store = Store::new(&self.engine, state);
95+
WasmFilterResource::try_new(store, component, &self.linker, config)
96+
}
6597
}
6698

6799
pub struct HostState {

0 commit comments

Comments
 (0)