Skip to content

Commit aada479

Browse files
committed
Ensure dynamic connection is only created when needed
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent 9ab65d9 commit aada479

File tree

3 files changed

+90
-19
lines changed

3 files changed

+90
-19
lines changed

crates/common/mqtt_channel/src/topics.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -191,6 +191,10 @@ impl TopicFilter {
191191
None
192192
}
193193
}
194+
195+
pub fn is_empty(&self) -> bool {
196+
self.patterns.is_empty()
197+
}
194198
}
195199

196200
impl TryInto<Topic> for &str {

crates/extensions/tedge_mqtt_ext/src/lib.rs

Lines changed: 40 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -326,7 +326,7 @@ impl FromPeers {
326326
match message {
327327
PublishOrSubscribe::Publish(message) => {
328328
tracing::debug!(target: "MQTT pub", "{message}");
329-
SinkExt::send(outgoing_mqtt, message)
329+
SinkExt::send(outgoing_mqtt, message)
330330
.await
331331
.map_err(Box::new)?;
332332
}
@@ -365,18 +365,29 @@ impl FromPeers {
365365
client.unsubscribe_many(diff.unsubscribe).await.unwrap();
366366
}
367367
});
368-
let dynamic_connection_config = self.base_config.clone().with_subscriptions(tf);
369-
let mut sender = tx_to_peers.clone();
370-
tokio::spawn(async move {
371-
let mut conn = mqtt_channel::Connection::new(&dynamic_connection_config).await.unwrap();
372-
while let Ok(msg) = tokio::time::timeout(Duration::from_secs(10), conn.received.next()).await {
373-
if let Some(msg) = msg {
374-
if msg.retain {
375-
SinkExt::send(&mut sender, (request.client_id, msg)).await.unwrap();
368+
if !tf.is_empty() {
369+
let dynamic_connection_config =
370+
self.base_config.clone().with_subscriptions(tf);
371+
let mut sender = tx_to_peers.clone();
372+
tokio::spawn(async move {
373+
let mut conn =
374+
mqtt_channel::Connection::new(&dynamic_connection_config)
375+
.await
376+
.unwrap();
377+
while let Ok(msg) =
378+
tokio::time::timeout(Duration::from_secs(10), conn.received.next())
379+
.await
380+
{
381+
if let Some(msg) = msg {
382+
if msg.retain {
383+
SinkExt::send(&mut sender, (request.client_id, msg))
384+
.await
385+
.unwrap();
386+
}
376387
}
377388
}
378-
}
379-
});
389+
});
390+
}
380391
}
381392
}
382393
}
@@ -510,9 +521,13 @@ impl Actor for MqttActor {
510521
tokio::spawn(async move { self.trie_service.run().await });
511522

512523
tedge_utils::futures::select(
513-
self.from_peers
514-
.relay_messages_to(&mut mqtt_client.published, &mut to_peer, mqtt_client.subscriptions),
515-
self.to_peers.relay_messages_from(&mut mqtt_client.received, &mut from_peer),
524+
self.from_peers.relay_messages_to(
525+
&mut mqtt_client.published,
526+
&mut to_peer,
527+
mqtt_client.subscriptions,
528+
),
529+
self.to_peers
530+
.relay_messages_from(&mut mqtt_client.received, &mut from_peer),
516531
)
517532
.await
518533
}
@@ -624,12 +639,12 @@ mod unit_tests {
624639
id: 0
625640
))
626641
.await;
627-
642+
628643
actor
629644
.subscribe_client
630645
.assert_unsubscribed_from(["a/b".into()])
631646
.await;
632-
647+
633648
actor.close().await;
634649
}
635650

@@ -752,9 +767,15 @@ mod unit_tests {
752767
let subscribe_client = MockSubscriberOps::default();
753768
let from_peers = {
754769
let client = subscribe_client.clone();
755-
tokio::spawn(async move { fp.relay_messages_to(&mut outgoing_mqtt, &mut tx, client).await })
770+
tokio::spawn(async move {
771+
fp.relay_messages_to(&mut outgoing_mqtt, &mut tx, client)
772+
.await
773+
})
756774
};
757-
let to_peers = tokio::spawn(async move { tp.relay_messages_from(&mut incoming_messages, &mut rx).await });
775+
let to_peers = tokio::spawn(async move {
776+
tp.relay_messages_from(&mut incoming_messages, &mut rx)
777+
.await
778+
});
758779

759780
Self {
760781
subscribe_client,
@@ -771,7 +792,7 @@ mod unit_tests {
771792

772793
/// Closes the channels associated with this actor and waits for both
773794
/// loops to finish executing
774-
///
795+
///
775796
/// This allows the `SubscriberOps::drop` implementation to reliably
776797
/// flag any unasserted communication
777798
pub async fn close(mut self) {

crates/extensions/tedge_mqtt_ext/src/tests.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -312,6 +312,52 @@ async fn dynamic_subscribers_receive_retain_messages() {
312312
}
313313
}
314314

315+
#[tokio::test]
316+
async fn dynamic_subscribers_receive_retain_messages_when_upgrading_topic() {
317+
let broker = mqtt_tests::test_mqtt_broker();
318+
let mqtt_config = MqttConfig::default().with_port(broker.port);
319+
let mut mqtt = MqttActorBuilder::new(mqtt_config);
320+
321+
broker
322+
.publish_with_opts("a/b", "retain", QoS::AtLeastOnce, true)
323+
.await
324+
.unwrap();
325+
326+
let mut client_0 = SimpleMessageBoxBuilder::<_, PublishOrSubscribe>::new("dyn-subscriber", 16);
327+
let mut client_1 = SimpleMessageBoxBuilder::<_, PublishOrSubscribe>::new("dyn-subscriber1", 16);
328+
let _client_id_0 = mqtt.connect_id_sink(TopicFilter::new_unchecked("a/b"), &client_0);
329+
let client_id_1 = mqtt.connect_id_sink(TopicFilter::empty(), &client_1);
330+
client_0.connect_sink(NoConfig, &mqtt);
331+
client_1.connect_sink(NoConfig, &mqtt);
332+
let mqtt = mqtt.build();
333+
tokio::spawn(async move { mqtt.run().await.unwrap() });
334+
let mut client_0 = client_0.build();
335+
let mut client_1 = client_1.build();
336+
337+
let msg = MqttMessage::new(&Topic::new_unchecked("a/b"), "retain").with_retain();
338+
339+
// client_0 receives retain message upon subscribing to "a/b"
340+
assert_eq!(timeout(client_0.recv()).await.unwrap(), msg);
341+
342+
client_1
343+
.send(PublishOrSubscribe::Subscribe(SubscriptionRequest {
344+
diff: SubscriptionDiff {
345+
subscribe: ["a/+".into()].into(),
346+
unsubscribe: [].into(),
347+
},
348+
client_id: client_id_1,
349+
}))
350+
.await
351+
.unwrap();
352+
353+
// client_1 should receive both "a/b" and "b/c" retain messages upon subscribing
354+
let recv = timeout(client_1.recv()).await.unwrap();
355+
assert_eq!(recv, msg);
356+
357+
// Retain message might be redelivered to client_0, but this is
358+
// implementation dependent, don't assert either way
359+
}
360+
315361
async fn timeout<T>(fut: impl Future<Output = T>) -> T {
316362
tokio::time::timeout(Duration::from_secs(1), fut)
317363
.await

0 commit comments

Comments
 (0)