@@ -532,3 +532,62 @@ async fn ensure_that_last_will_message_is_delivered() -> Result<(), anyhow::Erro
532
532
. await ;
533
533
Ok ( ( ) )
534
534
}
535
+
536
+ #[ tokio:: test]
537
+ #[ serial]
538
+ async fn test_retain_message_delivery ( ) -> Result < ( ) , anyhow:: Error > {
539
+ // Given an MQTT broker
540
+ let broker = mqtt_tests:: test_mqtt_broker ( ) ;
541
+ let mqtt_config = Config :: default ( ) . with_port ( broker. port ) ;
542
+
543
+ let topic = "retained/topic" ;
544
+ let mqtt_config = mqtt_config. with_subscriptions ( topic. try_into ( ) ?) ;
545
+
546
+ // A client that subsribes to a topic.
547
+ let mut first_subscriber = Connection :: new ( & mqtt_config) . await ?;
548
+
549
+ //Raise retained alarm message
550
+ broker
551
+ . publish_with_opts (
552
+ "retained/topic" ,
553
+ "a retained message" ,
554
+ QoS :: AtLeastOnce ,
555
+ true ,
556
+ )
557
+ . await
558
+ . unwrap ( ) ;
559
+
560
+ //Expect the non-empty retained message to be delivered to first_subscriber
561
+ assert_eq ! (
562
+ MaybeMessage :: Next ( message( topic, "a retained message" ) ) ,
563
+ next_message( & mut first_subscriber. received) . await
564
+ ) ;
565
+
566
+ //Clear the last raised retained message
567
+ broker
568
+ . publish_with_opts (
569
+ "retained/topic" ,
570
+ "" , //Empty message indicates clear
571
+ QoS :: AtLeastOnce ,
572
+ true ,
573
+ )
574
+ . await
575
+ . unwrap ( ) ;
576
+
577
+ // Connect to the broker with the same session id
578
+ let mut second_subscriber = Connection :: new ( & mqtt_config) . await ?;
579
+
580
+ //Expect no messages to be delivered to this second_subscriber as the retained message is already cleared
581
+ assert_eq ! (
582
+ MaybeMessage :: Timeout ,
583
+ next_message( & mut second_subscriber. received) . await
584
+ ) ;
585
+
586
+ //Expect the empty retained message to be delivered to first_subscriber
587
+ assert_eq ! (
588
+ MaybeMessage :: Next ( message( topic, "" ) ) ,
589
+ next_message( & mut first_subscriber. received) . await
590
+ ) ;
591
+
592
+ Ok ( ( ) )
593
+ }
0 commit comments