Skip to content

Commit ac991c0

Browse files
authored
Merge pull request #3681 from albinsuresh/fix/3632/track-all-entity-retained-messages-v2
fix: clear all retained entity messages on entity deregistration
2 parents 5214cac + 405330d commit ac991c0

File tree

6 files changed

+239
-71
lines changed

6 files changed

+239
-71
lines changed

crates/core/tedge_agent/src/agent.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ use tedge_log_manager::LogManagerConfig;
5858
use tedge_log_manager::LogManagerOptions;
5959
use tedge_mqtt_ext::MqttActorBuilder;
6060
use tedge_mqtt_ext::MqttConfig;
61+
use tedge_mqtt_ext::MqttDynamicConnector;
6162
use tedge_mqtt_ext::TopicFilter;
6263
use tedge_script_ext::ScriptActor;
6364
use tedge_signal_ext::SignalActor;
@@ -271,7 +272,7 @@ impl Agent {
271272
let mut restart_actor_builder = RestartManagerBuilder::new(self.config.restart_config);
272273

273274
// Mqtt actor
274-
let mut mqtt_actor_builder = MqttActorBuilder::new(self.config.mqtt_config);
275+
let mut mqtt_actor_builder = MqttActorBuilder::new(self.config.mqtt_config.clone());
275276

276277
// Software update actor
277278
let mut software_update_builder = SoftwareManagerBuilder::new(self.config.sw_update_config);
@@ -387,6 +388,7 @@ impl Agent {
387388
let entity_store_server = EntityStoreServer::new(
388389
entity_store,
389390
mqtt_schema.clone(),
391+
Box::new(MqttDynamicConnector::new(self.config.mqtt_config)),
390392
&mut mqtt_actor_builder,
391393
self.config.entity_auto_register,
392394
);

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 54 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,17 @@ use tedge_api::entity_store::EntityTwinMessage;
1212
use tedge_api::entity_store::EntityUpdateMessage;
1313
use tedge_api::entity_store::ListFilters;
1414
use tedge_api::mqtt_topics::Channel;
15+
use tedge_api::mqtt_topics::ChannelFilter;
16+
use tedge_api::mqtt_topics::EntityFilter;
1517
use tedge_api::mqtt_topics::EntityTopicId;
1618
use tedge_api::mqtt_topics::MqttSchema;
1719
use tedge_api::pending_entity_store::RegisteredEntityData;
1820
use tedge_api::EntityStore;
21+
use tedge_mqtt_ext::MqttConnector;
1922
use tedge_mqtt_ext::MqttMessage;
20-
use tedge_mqtt_ext::QoS;
2123
use tedge_mqtt_ext::TopicFilter;
24+
use tokio::time::timeout;
25+
use tokio::time::Duration;
2226
use tracing::error;
2327

2428
#[derive(Debug)]
@@ -52,6 +56,7 @@ pub enum EntityStoreResponse {
5256
pub struct EntityStoreServer {
5357
entity_store: EntityStore,
5458
mqtt_schema: MqttSchema,
59+
mqtt_connector: Box<dyn MqttConnector>,
5560
mqtt_publisher: LoggingSender<MqttMessage>,
5661
entity_auto_register: bool,
5762
}
@@ -60,6 +65,7 @@ impl EntityStoreServer {
6065
pub fn new(
6166
entity_store: EntityStore,
6267
mqtt_schema: MqttSchema,
68+
mqtt_connector: Box<dyn MqttConnector>,
6369
mqtt_actor: &mut impl MessageSink<MqttMessage>,
6470
entity_auto_register: bool,
6571
) -> Self {
@@ -68,6 +74,7 @@ impl EntityStoreServer {
6874
Self {
6975
entity_store,
7076
mqtt_schema,
77+
mqtt_connector,
7178
mqtt_publisher,
7279
entity_auto_register,
7380
}
@@ -306,24 +313,56 @@ impl EntityStoreServer {
306313

307314
async fn deregister_entity(&mut self, topic_id: &EntityTopicId) -> Vec<EntityMetadata> {
308315
let deleted = self.entity_store.deregister_entity(topic_id);
309-
for entity in deleted.iter().rev() {
310-
for twin_key in entity.twin_data.keys() {
311-
let topic = self.mqtt_schema.topic_for(
312-
&entity.topic_id,
313-
&Channel::EntityTwinData {
314-
fragment_key: twin_key.to_string(),
315-
},
316-
);
317-
let clear_twin_msg = MqttMessage::new(&topic, "").with_retain();
318-
319-
self.publish_message(clear_twin_msg).await;
316+
317+
let mut topics = TopicFilter::empty();
318+
for entity in deleted.iter() {
319+
for channel_filter in [
320+
ChannelFilter::MeasurementMetadata,
321+
ChannelFilter::EventMetadata,
322+
ChannelFilter::AlarmMetadata,
323+
ChannelFilter::Alarm,
324+
ChannelFilter::EntityTwinData,
325+
ChannelFilter::AnyCommand,
326+
ChannelFilter::AnyCommandMetadata,
327+
] {
328+
let topic = self
329+
.mqtt_schema
330+
.topics(EntityFilter::Entity(&entity.topic_id), channel_filter);
331+
topics.add_all(topic);
332+
}
333+
}
334+
335+
// A single connection to retrieve all retained metadata messages for all deleted entities
336+
match self.mqtt_connector.connect(topics).await {
337+
Ok(mut connection) => {
338+
while let Ok(Some(message)) =
339+
timeout(Duration::from_secs(1), connection.next_message()).await
340+
{
341+
if message.retain && !message.payload_bytes().is_empty() {
342+
let clear_msg = MqttMessage::new(&message.topic, "").with_retain();
343+
if let Err(err) = self.mqtt_publisher.send(clear_msg).await {
344+
error!(
345+
"Failed to clear retained message on topic {} while de-registering {} due to: {err}",
346+
topic_id,
347+
message.topic
348+
);
349+
}
350+
}
351+
}
352+
353+
connection.disconnect().await;
354+
}
355+
Err(err) => {
356+
error!("Failed to create MQTT connection for clearing entity data: {err}");
320357
}
358+
}
359+
360+
// Clear the entity metadata of all deleted entities bottom up
361+
for entity in deleted.iter().rev() {
321362
let topic = self
322363
.mqtt_schema
323364
.topic_for(&entity.topic_id, &Channel::EntityMetadata);
324-
let clear_entity_msg = MqttMessage::new(&topic, "")
325-
.with_retain()
326-
.with_qos(QoS::AtLeastOnce);
365+
let clear_entity_msg = MqttMessage::new(&topic, "").with_retain();
327366

328367
self.publish_message(clear_entity_msg).await;
329368
}

0 commit comments

Comments
 (0)