Skip to content

Commit 5664833

Browse files
committed
Use the tedge mqtt actor for entity deregistration
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent dc3ac6b commit 5664833

File tree

8 files changed

+403
-288
lines changed

8 files changed

+403
-288
lines changed

crates/core/tedge_actors/src/test_helpers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ where
259259
}
260260
}
261261

262-
#[allow(clippy::needless_collect)] // To avoid issues with Send constraints
263262
async fn assert_received<Samples>(&mut self, expected: Samples)
264263
where
265264
Samples: IntoIterator + Send,

crates/core/tedge_agent/src/agent.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use tedge_log_manager::LogManagerConfig;
5858
use tedge_log_manager::LogManagerOptions;
5959
use tedge_mqtt_ext::MqttActorBuilder;
6060
use tedge_mqtt_ext::MqttConfig;
61-
use tedge_mqtt_ext::MqttDynamicConnector;
6261
use tedge_mqtt_ext::TopicFilter;
6362
use tedge_script_ext::ScriptActor;
6463
use tedge_signal_ext::SignalActor;
@@ -388,7 +387,6 @@ impl Agent {
388387
let entity_store_server = EntityStoreServer::new(
389388
entity_store,
390389
mqtt_schema.clone(),
391-
Box::new(MqttDynamicConnector::new(self.config.mqtt_config)),
392390
&mut mqtt_actor_builder,
393391
self.config.entity_auto_register,
394392
);

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 93 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1+
use std::convert::Infallible;
2+
13
use async_trait::async_trait;
24
use serde_json::Map;
35
use serde_json::Value;
6+
use tedge_actors::Actor;
7+
use tedge_actors::Builder;
48
use tedge_actors::LoggingSender;
9+
use tedge_actors::MappingSender;
10+
use tedge_actors::MessageReceiver;
511
use tedge_actors::MessageSink;
12+
use tedge_actors::MessageSource;
13+
use tedge_actors::NoConfig;
14+
use tedge_actors::NoMessage;
15+
use tedge_actors::RuntimeError;
616
use tedge_actors::Sender;
717
use tedge_actors::Server;
18+
use tedge_actors::SimpleMessageBox;
19+
use tedge_actors::SimpleMessageBoxBuilder;
820
use tedge_api::entity::EntityMetadata;
921
use tedge_api::entity_store;
1022
use tedge_api::entity_store::EntityRegistrationMessage;
@@ -18,11 +30,10 @@ use tedge_api::mqtt_topics::EntityTopicId;
1830
use tedge_api::mqtt_topics::MqttSchema;
1931
use tedge_api::pending_entity_store::RegisteredEntityData;
2032
use tedge_api::EntityStore;
21-
use tedge_mqtt_ext::MqttConnector;
33+
use tedge_mqtt_ext::DynSubscriptionsInner;
2234
use tedge_mqtt_ext::MqttMessage;
35+
use tedge_mqtt_ext::MqttRequest;
2336
use tedge_mqtt_ext::TopicFilter;
24-
use tokio::time::timeout;
25-
use tokio::time::Duration;
2637
use tracing::error;
2738

2839
#[derive(Debug)]
@@ -56,27 +67,94 @@ pub enum EntityStoreResponse {
5667
pub struct EntityStoreServer {
5768
entity_store: EntityStore,
5869
mqtt_schema: MqttSchema,
59-
mqtt_connector: Box<dyn MqttConnector>,
6070
mqtt_publisher: LoggingSender<MqttMessage>,
6171
entity_auto_register: bool,
72+
retain_requests: SimpleMessageBox<NoMessage, TopicFilter>,
73+
}
74+
75+
struct DeregistrationActorBuilder {
76+
mqtt_publish: LoggingSender<MqttMessage>,
77+
messages: SimpleMessageBoxBuilder<MqttMessage, NoMessage>,
78+
}
79+
80+
impl Builder<DeregistrationActor> for DeregistrationActorBuilder {
81+
type Error = Infallible;
82+
83+
fn try_build(self) -> Result<DeregistrationActor, Self::Error> {
84+
Ok(DeregistrationActor {
85+
mqtt_publish: self.mqtt_publish,
86+
messages: self.messages.build(),
87+
})
88+
}
89+
}
90+
91+
struct DeregistrationActor {
92+
messages: SimpleMessageBox<MqttMessage, NoMessage>,
93+
mqtt_publish: LoggingSender<MqttMessage>,
94+
}
95+
96+
#[async_trait::async_trait]
97+
impl Actor for DeregistrationActor {
98+
fn name(&self) -> &str {
99+
todo!()
100+
}
101+
102+
async fn run(mut self) -> Result<(), RuntimeError> {
103+
while let Ok(Some(msg)) = self.messages.try_recv().await {
104+
if msg.retain && !msg.payload.as_bytes().is_empty() {
105+
let clear_msg = MqttMessage::new(&msg.topic, "").with_retain();
106+
self.mqtt_publish.send(clear_msg).await.unwrap();
107+
}
108+
}
109+
Ok(())
110+
}
111+
}
112+
113+
impl MessageSink<MqttMessage> for DeregistrationActorBuilder {
114+
fn get_sender(&self) -> tedge_actors::DynSender<MqttMessage> {
115+
self.messages.get_sender()
116+
}
62117
}
63118

64119
impl EntityStoreServer {
65-
pub fn new(
120+
pub fn new<M>(
66121
entity_store: EntityStore,
67122
mqtt_schema: MqttSchema,
68-
mqtt_connector: Box<dyn MqttConnector>,
69-
mqtt_actor: &mut impl MessageSink<MqttMessage>,
123+
mqtt_actor: &mut M,
70124
entity_auto_register: bool,
71-
) -> Self {
72-
let mqtt_publisher = LoggingSender::new("MqttPublisher".into(), mqtt_actor.get_sender());
125+
) -> Self
126+
where
127+
M: MessageSink<MqttRequest>
128+
+ for<'a> MessageSource<MqttMessage, &'a mut DynSubscriptionsInner>,
129+
{
130+
let mqtt_publisher = LoggingSender::new(
131+
"MqttPublisher".into(),
132+
Box::new(MappingSender::new(mqtt_actor.get_sender(), |msg| {
133+
[MqttRequest::Publish(msg)]
134+
})),
135+
);
136+
let mut retain_requests = SimpleMessageBoxBuilder::new("DeregistrationClient", 16);
137+
let mut dyn_subs = DynSubscriptionsInner::new(TopicFilter::empty());
138+
let messages = SimpleMessageBoxBuilder::new("DeregistrationActor", 16);
139+
let dereg_actor_builder = DeregistrationActorBuilder {
140+
mqtt_publish: mqtt_publisher.clone(),
141+
messages,
142+
};
143+
mqtt_actor.connect_sink(&mut dyn_subs, &dereg_actor_builder);
144+
let client_id = dyn_subs.client_id();
145+
mqtt_actor.connect_mapped_source(NoConfig, &mut retain_requests, move |topics| {
146+
[MqttRequest::RetrieveRetain(client_id, topics)]
147+
});
148+
149+
// TODO - no don't do this!
150+
tokio::spawn(dereg_actor_builder.build().run());
73151

74152
Self {
75153
entity_store,
76154
mqtt_schema,
77-
mqtt_connector,
78155
mqtt_publisher,
79156
entity_auto_register,
157+
retain_requests: retain_requests.build(),
80158
}
81159
}
82160

@@ -314,6 +392,10 @@ impl EntityStoreServer {
314392
async fn deregister_entity(&mut self, topic_id: &EntityTopicId) -> Vec<EntityMetadata> {
315393
let deleted = self.entity_store.deregister_entity(topic_id);
316394

395+
if deleted.is_empty() {
396+
return vec![];
397+
}
398+
317399
let mut topics = TopicFilter::empty();
318400
for entity in deleted.iter() {
319401
for channel_filter in [
@@ -332,30 +414,7 @@ impl EntityStoreServer {
332414
}
333415
}
334416

335-
// A single connection to retrieve all retained metadata messages for all deleted entities
336-
match self.mqtt_connector.connect(topics).await {
337-
Ok(mut connection) => {
338-
while let Ok(Some(message)) =
339-
timeout(Duration::from_secs(1), connection.next_message()).await
340-
{
341-
if message.retain && !message.payload_bytes().is_empty() {
342-
let clear_msg = MqttMessage::new(&message.topic, "").with_retain();
343-
if let Err(err) = self.mqtt_publisher.send(clear_msg).await {
344-
error!(
345-
"Failed to clear retained message on topic {} while de-registering {} due to: {err}",
346-
topic_id,
347-
message.topic
348-
);
349-
}
350-
}
351-
}
352-
353-
connection.disconnect().await;
354-
}
355-
Err(err) => {
356-
error!("Failed to create MQTT connection for clearing entity data: {err}");
357-
}
358-
}
417+
self.retain_requests.send(topics).await.unwrap();
359418

360419
// Clear the entity metadata of all deleted entities bottom up
361420
for entity in deleted.iter().rev() {

0 commit comments

Comments
 (0)