Skip to content

Commit 9ab65d9

Browse files
committed
Add test to ensure that cloning MQTT configurations doesn't share subscriptions
Signed-off-by: James Rhodes <jarhodes314@gmail.com>
1 parent 03ef4c5 commit 9ab65d9

File tree

2 files changed

+70
-4
lines changed

2 files changed

+70
-4
lines changed

crates/common/mqtt_channel/src/tests.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ use futures::SinkExt;
33
use futures::StreamExt;
44
use std::convert::TryInto;
55
use std::time::Duration;
6+
use tokio::time::timeout;
67

78
const TIMEOUT: Duration = Duration::from_millis(1000);
89

@@ -568,3 +569,49 @@ async fn assert_payload_received(con: &mut Connection, payload: &'static str) {
568569
not_msg => panic!("Expected message to be received, got {not_msg:?}"),
569570
}
570571
}
572+
573+
#[tokio::test]
574+
async fn connections_from_cloned_configs_are_independent() -> Result<(), anyhow::Error> {
575+
// This test arose from an issue with dynamic subscriptions where
576+
// subscriptions were shared between different MQTT channel instances
577+
let broker = mqtt_tests::test_mqtt_broker();
578+
let mqtt_config = Config::default().with_port(broker.port);
579+
let mqtt_config_cloned = mqtt_config.clone();
580+
581+
let topic = uniquify!("a/test/topic");
582+
let other_topic = uniquify!("different/test/topic");
583+
let mqtt_config = mqtt_config.with_subscriptions(TopicFilter::new_unchecked(topic));
584+
let mqtt_config_cloned =
585+
mqtt_config_cloned.with_subscriptions(TopicFilter::new_unchecked(other_topic));
586+
587+
let mut con = Connection::new(&mqtt_config).await?;
588+
let mut other_con = Connection::new(&mqtt_config_cloned).await?;
589+
590+
// Any messages published on that topic ...
591+
broker.publish(topic, "original topic message").await?;
592+
broker.publish(other_topic, "other topic message").await?;
593+
594+
// ... must be received by the client
595+
assert_eq!(
596+
MaybeMessage::Next(message(topic, "original topic message")),
597+
next_message(&mut con.received).await
598+
);
599+
assert_eq!(
600+
MaybeMessage::Next(message(other_topic, "other topic message")),
601+
next_message(&mut other_con.received).await
602+
);
603+
604+
assert!(
605+
timeout(Duration::from_millis(200), next_message(&mut con.received))
606+
.await
607+
.is_err()
608+
);
609+
assert!(timeout(
610+
Duration::from_millis(200),
611+
next_message(&mut other_con.received)
612+
)
613+
.await
614+
.is_err());
615+
616+
Ok(())
617+
}

crates/extensions/tedge_mqtt_ext/src/tests.rs

Lines changed: 23 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -255,8 +255,14 @@ async fn dynamic_subscribers_receive_retain_messages() {
255255
let mqtt_config = MqttConfig::default().with_port(broker.port);
256256
let mut mqtt = MqttActorBuilder::new(mqtt_config);
257257

258-
broker.publish_with_opts("a/b", "retain", QoS::AtLeastOnce, true).await.unwrap();
259-
broker.publish_with_opts("b/c", "retain", QoS::AtLeastOnce, true).await.unwrap();
258+
broker
259+
.publish_with_opts("a/b", "retain", QoS::AtLeastOnce, true)
260+
.await
261+
.unwrap();
262+
broker
263+
.publish_with_opts("b/c", "retain", QoS::AtLeastOnce, true)
264+
.await
265+
.unwrap();
260266

261267
let mut client_0 = SimpleMessageBoxBuilder::<_, PublishOrSubscribe>::new("dyn-subscriber", 16);
262268
let mut client_1 = SimpleMessageBoxBuilder::<_, PublishOrSubscribe>::new("dyn-subscriber1", 16);
@@ -275,14 +281,27 @@ async fn dynamic_subscribers_receive_retain_messages() {
275281
// client_0 receives retain message upon subscribing to "a/b"
276282
assert_eq!(timeout(client_0.recv()).await.unwrap(), msg);
277283

278-
client_1.send(PublishOrSubscribe::Subscribe(SubscriptionRequest { diff: SubscriptionDiff { subscribe: ["a/b".into(), "b/c".into()].into(), unsubscribe: [].into() }, client_id: client_id_1 })).await.unwrap();
284+
client_1
285+
.send(PublishOrSubscribe::Subscribe(SubscriptionRequest {
286+
diff: SubscriptionDiff {
287+
subscribe: ["a/b".into(), "b/c".into()].into(),
288+
unsubscribe: [].into(),
289+
},
290+
client_id: client_id_1,
291+
}))
292+
.await
293+
.unwrap();
279294

280295
// client_1 should receive both "a/b" and "b/c" retain messages upon subscribing
281296
let recv = timeout(client_1.recv()).await.unwrap();
282297
let recv2 = timeout(client_1.recv()).await.unwrap();
283298

284299
// Retain message should not be redelivered to client_0
285-
assert!(tokio::time::timeout(Duration::from_millis(200), client_0.recv()).await.is_err());
300+
assert!(
301+
tokio::time::timeout(Duration::from_millis(200), client_0.recv())
302+
.await
303+
.is_err()
304+
);
286305

287306
if recv.topic.name == "a/b" {
288307
assert_eq!(recv, msg);

0 commit comments

Comments
 (0)