Skip to content

Commit 7aa1354

Browse files
authored
Merge pull request #3714 from rina23q/refactor/remove-outdated-messages-from-availability-actor
refactor: cleanup code and outdated comments from the availability actor
2 parents 70331cd + bd6db14 commit 7aa1354

File tree

4 files changed

+43
-51
lines changed

4 files changed

+43
-51
lines changed

crates/extensions/c8y_mapper_ext/src/availability/actor.rs

Lines changed: 23 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,11 @@
11
use crate::availability::AvailabilityConfig;
22
use crate::availability::AvailabilityInput;
33
use crate::availability::AvailabilityOutput;
4-
use crate::availability::C8yJsonInventoryUpdate;
4+
use crate::availability::C8yJsonEmptyInventoryUpdate;
55
use crate::availability::C8ySmartRestSetInterval117;
66
use crate::availability::TimerStart;
77
use async_trait::async_trait;
88
use c8y_api::smartrest::topic::C8yTopic;
9-
use serde_json::json;
109
use std::collections::HashMap;
1110
use tedge_actors::Actor;
1211
use tedge_actors::LoggingSender;
@@ -156,44 +155,40 @@ impl AvailabilityActor {
156155
/// Insert a <"device topic ID" - "health entity topic ID" and "external ID"> pair into the map.
157156
/// If @health is provided in the registration message, use the value as long as it's valid as entity topic ID.
158157
/// If @health is not provided, use the "tedge-agent" service topic ID as default.
159-
/// @id is the only source to know the device's external ID. Hence, @id must be provided in the registration message.
160158
fn update_device_service_pair(
161159
&mut self,
162160
registration_message: &EntityRegistrationMessage,
163161
) -> RegistrationResult {
164162
let source = &registration_message.topic_id;
165163

166-
let result = match registration_message.health_endpoint.clone() {
164+
let maybe_health_topic_id = match registration_message.health_endpoint.clone() {
167165
None => registration_message
168166
.topic_id
169-
.default_service_for_device("tedge-agent")
170-
.ok_or_else( || format!("The entity is not in the default topic scheme. Please specify '@health' to enable availability monitoring for the device '{source}'")),
171-
Some(topic_id) => Ok(topic_id),
167+
.default_service_for_device("tedge-agent"),
168+
Some(topic_id) => Some(topic_id),
172169
};
173170

174-
match result {
175-
Ok(health_topic_id) => match registration_message.external_id.clone() {
176-
None => RegistrationResult::Skip(format!("Registration message is skipped since '@id' is missing in the payload. source: '{source}', {registration_message:?}")),
177-
Some(external_id) => {
178-
if let Some(ids) = self.device_ids_map.get(source) {
179-
if ids.health_topic_id == health_topic_id {
180-
return RegistrationResult::Skip(format!("Registration message is skipped since no health endpoint change is detected. source: '{source}', {registration_message:?}"))
181-
}
171+
match (maybe_health_topic_id, registration_message.external_id.clone()) {
172+
(None, _) => RegistrationResult::Error(format!("The entity is not in the default topic scheme. Please specify '@health' to enable availability monitoring for the device '{source}'")),
173+
(_, None) => RegistrationResult::Error(format!("External ID is missing from the received registration message. source: '{source}', {registration_message:?}")),
174+
(Some(health_topic_id), Some(external_id)) => {
175+
if let Some(ids) = self.device_ids_map.get(source) {
176+
if ids.health_topic_id == health_topic_id {
177+
return RegistrationResult::Skip(format!("Registration message is skipped since no health endpoint change is detected. source: '{source}', {registration_message:?}"))
182178
}
179+
}
183180

184-
match self.device_ids_map.insert(
185-
source.clone(),
186-
DeviceIds {
187-
health_topic_id,
188-
external_id,
189-
},
190-
) {
191-
None => RegistrationResult::New,
192-
Some(_) => RegistrationResult::Update,
193-
}
181+
match self.device_ids_map.insert(
182+
source.clone(),
183+
DeviceIds {
184+
health_topic_id,
185+
external_id,
186+
},
187+
) {
188+
None => RegistrationResult::New,
189+
Some(_) => RegistrationResult::Update,
194190
}
195-
},
196-
Err(err) => RegistrationResult::Error(err),
191+
}
197192
}
198193
}
199194

@@ -264,9 +259,8 @@ impl AvailabilityActor {
264259
if let Some(health_status) = self.health_status_map.get(service_topic_id) {
265260
// Send an empty JSON over MQTT message if the target service status is "up"
266261
if health_status.status == Status::Up {
267-
let json_over_mqtt = C8yJsonInventoryUpdate {
262+
let json_over_mqtt = C8yJsonEmptyInventoryUpdate {
268263
external_id: external_id.into(),
269-
payload: json!({}),
270264
prefix: self.config.c8y_prefix.clone(),
271265
};
272266
self.message_box.send(json_over_mqtt.into()).await?;

crates/extensions/c8y_mapper_ext/src/availability/builder.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ impl AvailabilityBuilder {
8686
fn mqtt_message_builder() -> impl Fn(AvailabilityOutput) -> Option<MqttMessage> {
8787
move |res| match res {
8888
AvailabilityOutput::C8ySmartRestSetInterval117(value) => Some(value.into()),
89-
AvailabilityOutput::C8yJsonInventoryUpdate(value) => Some(value.into()),
89+
AvailabilityOutput::C8yJsonEmptyInventoryUpdate(value) => Some(value.into()),
9090
}
9191
}
9292
}

crates/extensions/c8y_mapper_ext/src/availability/mod.rs

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
use crate::availability::actor::TimerPayload;
2+
use crate::inventory::inventory_update_topic;
23
pub use builder::AvailabilityBuilder;
34
use c8y_api::smartrest::inventory::C8ySmartRestSetInterval117;
5+
use serde_json::json;
46
use std::time::Duration;
57
use tedge_actors::fan_in_message_type;
68
use tedge_api::entity::EntityExternalId;
@@ -12,7 +14,6 @@ use tedge_config::models::TopicPrefix;
1214
use tedge_config::tedge_toml::ReadError;
1315
use tedge_config::TEdgeConfig;
1416
use tedge_mqtt_ext::MqttMessage;
15-
use tedge_mqtt_ext::Topic;
1617
use tedge_timer_ext::SetTimeout;
1718
use tedge_timer_ext::Timeout;
1819

@@ -26,27 +27,18 @@ pub type TimerComplete = Timeout<TimerPayload>;
2627
pub type SourceHealthStatus = (EntityTopicId, HealthStatus);
2728

2829
fan_in_message_type!(AvailabilityInput[EntityRegistrationMessage, SourceHealthStatus, TimerComplete] : Debug);
29-
fan_in_message_type!(AvailabilityOutput[C8ySmartRestSetInterval117, C8yJsonInventoryUpdate] : Debug);
30+
fan_in_message_type!(AvailabilityOutput[C8ySmartRestSetInterval117, C8yJsonEmptyInventoryUpdate] : Debug);
3031

31-
// TODO! Make it generic and move to c8y_api crate while refactoring c8y-mapper
3232
#[derive(Debug)]
33-
pub struct C8yJsonInventoryUpdate {
33+
pub struct C8yJsonEmptyInventoryUpdate {
3434
external_id: String,
35-
payload: serde_json::Value,
36-
pub prefix: TopicPrefix,
35+
prefix: TopicPrefix,
3736
}
3837

39-
impl From<C8yJsonInventoryUpdate> for MqttMessage {
40-
fn from(value: C8yJsonInventoryUpdate) -> Self {
41-
let json_over_mqtt_topic = format!(
42-
"{prefix}/inventory/managedObjects/update/{external_id}",
43-
prefix = value.prefix,
44-
external_id = value.external_id
45-
);
46-
MqttMessage::new(
47-
&Topic::new_unchecked(&json_over_mqtt_topic),
48-
value.payload.to_string(),
49-
)
38+
impl From<C8yJsonEmptyInventoryUpdate> for MqttMessage {
39+
fn from(value: C8yJsonEmptyInventoryUpdate) -> Self {
40+
let json_over_mqtt_topic = inventory_update_topic(&value.prefix, &value.external_id);
41+
MqttMessage::new(&json_over_mqtt_topic, json!({}).to_string())
5042
}
5143
}
5244

crates/extensions/c8y_mapper_ext/src/inventory.rs

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ use tedge_api::entity::EntityType;
1111
use tedge_api::entity_store::EntityTwinMessage;
1212
use tedge_api::mqtt_topics::Channel;
1313
use tedge_api::mqtt_topics::EntityTopicId;
14+
use tedge_config::models::TopicPrefix;
1415
use tedge_mqtt_ext::MqttMessage;
1516
use tedge_mqtt_ext::Topic;
1617
use tracing::info;
@@ -178,13 +179,18 @@ impl CumulocityConverter {
178179
source: &EntityTopicId,
179180
) -> Result<Topic, ConversionError> {
180181
let entity_external_id = self.entity_cache.try_get(source)?.external_id.as_ref();
181-
Ok(Topic::new_unchecked(&format!(
182-
"{prefix}/{INVENTORY_MANAGED_OBJECTS_TOPIC}/{entity_external_id}",
183-
prefix = self.config.bridge_config.c8y_prefix,
184-
)))
182+
let topic =
183+
inventory_update_topic(&self.config.bridge_config.c8y_prefix, entity_external_id);
184+
Ok(topic)
185185
}
186186
}
187187

188+
pub fn inventory_update_topic(prefix: &TopicPrefix, external_id: &str) -> Topic {
189+
Topic::new_unchecked(&format!(
190+
"{prefix}/{INVENTORY_MANAGED_OBJECTS_TOPIC}/{external_id}",
191+
))
192+
}
193+
188194
#[cfg(test)]
189195
mod tests {
190196
use crate::converter::tests::create_c8y_converter;

0 commit comments

Comments
 (0)