Skip to content

Commit 6d510fc

Browse files
committed
Use the tedge mqtt actor for entity deregistration
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent 04e7402 commit 6d510fc

File tree

9 files changed

+404
-292
lines changed

9 files changed

+404
-292
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/core/tedge_actors/src/test_helpers.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,7 +259,6 @@ where
259259
}
260260
}
261261

262-
#[allow(clippy::needless_collect)] // To avoid issues with Send constraints
263262
async fn assert_received<Samples>(&mut self, expected: Samples)
264263
where
265264
Samples: IntoIterator + Send,

crates/core/tedge_agent/src/agent.rs

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,6 @@ use tedge_log_manager::LogManagerConfig;
5858
use tedge_log_manager::LogManagerOptions;
5959
use tedge_mqtt_ext::MqttActorBuilder;
6060
use tedge_mqtt_ext::MqttConfig;
61-
use tedge_mqtt_ext::MqttDynamicConnector;
6261
use tedge_mqtt_ext::TopicFilter;
6362
use tedge_script_ext::ScriptActor;
6463
use tedge_signal_ext::SignalActor;
@@ -388,7 +387,6 @@ impl Agent {
388387
let entity_store_server = EntityStoreServer::new(
389388
entity_store,
390389
mqtt_schema.clone(),
391-
Box::new(MqttDynamicConnector::new(self.config.mqtt_config)),
392390
&mut mqtt_actor_builder,
393391
self.config.entity_auto_register,
394392
);

crates/core/tedge_agent/src/entity_manager/server.rs

Lines changed: 93 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,22 @@
1+
use std::convert::Infallible;
2+
13
use async_trait::async_trait;
24
use serde_json::Map;
35
use serde_json::Value;
6+
use tedge_actors::Actor;
7+
use tedge_actors::Builder;
48
use tedge_actors::LoggingSender;
9+
use tedge_actors::MappingSender;
10+
use tedge_actors::MessageReceiver;
511
use tedge_actors::MessageSink;
12+
use tedge_actors::MessageSource;
13+
use tedge_actors::NoConfig;
14+
use tedge_actors::NoMessage;
15+
use tedge_actors::RuntimeError;
616
use tedge_actors::Sender;
717
use tedge_actors::Server;
18+
use tedge_actors::SimpleMessageBox;
19+
use tedge_actors::SimpleMessageBoxBuilder;
820
use tedge_api::entity::EntityMetadata;
921
use tedge_api::entity_store;
1022
use tedge_api::entity_store::EntityRegistrationMessage;
@@ -18,11 +30,10 @@ use tedge_api::mqtt_topics::EntityTopicId;
1830
use tedge_api::mqtt_topics::MqttSchema;
1931
use tedge_api::pending_entity_store::RegisteredEntityData;
2032
use tedge_api::EntityStore;
21-
use tedge_mqtt_ext::MqttConnector;
33+
use tedge_mqtt_ext::DynSubscriptionsInner;
2234
use tedge_mqtt_ext::MqttMessage;
35+
use tedge_mqtt_ext::MqttRequest;
2336
use tedge_mqtt_ext::TopicFilter;
24-
use tokio::time::timeout;
25-
use tokio::time::Duration;
2637
use tracing::error;
2738

2839
#[derive(Debug)]
@@ -56,27 +67,94 @@ pub enum EntityStoreResponse {
5667
pub struct EntityStoreServer {
5768
entity_store: EntityStore,
5869
mqtt_schema: MqttSchema,
59-
mqtt_connector: Box<dyn MqttConnector>,
6070
mqtt_publisher: LoggingSender<MqttMessage>,
6171
entity_auto_register: bool,
72+
retain_requests: SimpleMessageBox<NoMessage, TopicFilter>,
73+
}
74+
75+
struct DeregistrationActorBuilder {
76+
mqtt_publish: LoggingSender<MqttMessage>,
77+
messages: SimpleMessageBoxBuilder<MqttMessage, NoMessage>,
78+
}
79+
80+
impl Builder<DeregistrationActor> for DeregistrationActorBuilder {
81+
type Error = Infallible;
82+
83+
fn try_build(self) -> Result<DeregistrationActor, Self::Error> {
84+
Ok(DeregistrationActor {
85+
mqtt_publish: self.mqtt_publish,
86+
messages: self.messages.build(),
87+
})
88+
}
89+
}
90+
91+
struct DeregistrationActor {
92+
messages: SimpleMessageBox<MqttMessage, NoMessage>,
93+
mqtt_publish: LoggingSender<MqttMessage>,
94+
}
95+
96+
#[async_trait::async_trait]
97+
impl Actor for DeregistrationActor {
98+
fn name(&self) -> &str {
99+
todo!()
100+
}
101+
102+
async fn run(mut self) -> Result<(), RuntimeError> {
103+
while let Ok(Some(msg)) = self.messages.try_recv().await {
104+
if msg.retain && !msg.payload.as_bytes().is_empty() {
105+
let clear_msg = MqttMessage::new(&msg.topic, "").with_retain();
106+
self.mqtt_publish.send(clear_msg).await.unwrap();
107+
}
108+
}
109+
Ok(())
110+
}
111+
}
112+
113+
impl MessageSink<MqttMessage> for DeregistrationActorBuilder {
114+
fn get_sender(&self) -> tedge_actors::DynSender<MqttMessage> {
115+
self.messages.get_sender()
116+
}
62117
}
63118

64119
impl EntityStoreServer {
65-
pub fn new(
120+
pub fn new<M>(
66121
entity_store: EntityStore,
67122
mqtt_schema: MqttSchema,
68-
mqtt_connector: Box<dyn MqttConnector>,
69-
mqtt_actor: &mut impl MessageSink<MqttMessage>,
123+
mqtt_actor: &mut M,
70124
entity_auto_register: bool,
71-
) -> Self {
72-
let mqtt_publisher = LoggingSender::new("MqttPublisher".into(), mqtt_actor.get_sender());
125+
) -> Self
126+
where
127+
M: MessageSink<MqttRequest>
128+
+ for<'a> MessageSource<MqttMessage, &'a mut DynSubscriptionsInner>,
129+
{
130+
let mqtt_publisher = LoggingSender::new(
131+
"MqttPublisher".into(),
132+
Box::new(MappingSender::new(mqtt_actor.get_sender(), |msg| {
133+
[MqttRequest::Publish(msg)]
134+
})),
135+
);
136+
let mut retain_requests = SimpleMessageBoxBuilder::new("DeregistrationClient", 16);
137+
let mut dyn_subs = DynSubscriptionsInner::new(TopicFilter::empty());
138+
let messages = SimpleMessageBoxBuilder::new("DeregistrationActor", 16);
139+
let dereg_actor_builder = DeregistrationActorBuilder {
140+
mqtt_publish: mqtt_publisher.clone(),
141+
messages,
142+
};
143+
mqtt_actor.connect_sink(&mut dyn_subs, &dereg_actor_builder);
144+
let client_id = dyn_subs.client_id();
145+
mqtt_actor.connect_mapped_source(NoConfig, &mut retain_requests, move |topics| {
146+
[MqttRequest::RetrieveRetain(client_id, topics)]
147+
});
148+
149+
// TODO - no don't do this!
150+
tokio::spawn(dereg_actor_builder.build().run());
73151

74152
Self {
75153
entity_store,
76154
mqtt_schema,
77-
mqtt_connector,
78155
mqtt_publisher,
79156
entity_auto_register,
157+
retain_requests: retain_requests.build(),
80158
}
81159
}
82160

@@ -321,6 +399,10 @@ impl EntityStoreServer {
321399
return deleted;
322400
}
323401

402+
if deleted.is_empty() {
403+
return vec![];
404+
}
405+
324406
let mut topics = TopicFilter::empty();
325407
for entity in deleted.iter() {
326408
for channel_filter in [
@@ -340,33 +422,7 @@ impl EntityStoreServer {
340422
}
341423
}
342424

343-
// A single connection to retrieve all retained metadata messages for all deleted entities
344-
match self.mqtt_connector.connect(topics.clone()).await {
345-
Ok(mut connection) => {
346-
while let Ok(Some(message)) =
347-
timeout(Duration::from_secs(1), connection.next_message()).await
348-
{
349-
if message.retain
350-
&& !message.payload_bytes().is_empty()
351-
&& topics.accept(&message)
352-
{
353-
let clear_msg = MqttMessage::new(&message.topic, "").with_retain();
354-
if let Err(err) = self.mqtt_publisher.send(clear_msg).await {
355-
error!(
356-
"Failed to clear retained message on topic {} while de-registering {} due to: {err}",
357-
topic_id,
358-
message.topic
359-
);
360-
}
361-
}
362-
}
363-
364-
connection.disconnect().await;
365-
}
366-
Err(err) => {
367-
error!("Failed to create MQTT connection for clearing entity data: {err}");
368-
}
369-
}
425+
self.retain_requests.send(topics).await.unwrap();
370426

371427
// Clear the entity metadata of all deleted entities bottom up
372428
for entity in deleted.iter().rev() {

0 commit comments

Comments
 (0)