@@ -28,6 +28,7 @@ SOFTWARE.
28
28
#include < algorithm>
29
29
#include < cstdint>
30
30
#include < cstring>
31
+ #include < memory>
31
32
#include < stdexcept>
32
33
#include < vector>
33
34
@@ -649,27 +650,34 @@ void MqttClient::setupSubscriptions() {
649
650
} catch (rclcpp::exceptions::RCLError& e) {
650
651
RCLCPP_ERROR (get_logger (), " Failed to create generic subscriber: %s" ,
651
652
e.what ());
652
- return ;
653
+ continue ;
653
654
}
654
655
RCLCPP_INFO (get_logger (), " Subscribed ROS topic '%s' of type '%s'" ,
655
656
ros_topic.c_str (), ros2mqtt.ros .msg_type .c_str ());
656
657
657
- }
658
- if (all_topics_and_types.count (ros_topic)) {
658
+ } else {
659
+ const auto &pubs = get_publishers_info_by_topic (ros_topic);
660
+
661
+ if (pubs.empty ()) continue ;
659
662
663
+ const auto endpointInfo = pubs.front ();
660
664
// check if message type has changed or if mapping is stale
661
- const std::string& msg_type = all_topics_and_types.at (ros_topic)[0 ];
665
+ const std::string msg_type = endpointInfo.topic_type ();
666
+
662
667
if (msg_type == ros2mqtt.ros .msg_type && !ros2mqtt.ros .is_stale ) continue ;
663
668
ros2mqtt.ros .is_stale = false ;
664
669
ros2mqtt.ros .msg_type = msg_type;
665
670
666
671
try {
672
+ auto qos = endpointInfo.qos_profile ();
667
673
ros2mqtt.ros .subscriber = create_generic_subscription (
668
- ros_topic, msg_type, ros2mqtt.ros .queue_size , bound_callback_func);
674
+ ros_topic, msg_type, qos.keep_last (ros2mqtt.ros .queue_size ),
675
+ bound_callback_func);
676
+
669
677
} catch (rclcpp::exceptions::RCLError& e) {
670
678
RCLCPP_ERROR (get_logger (), " Failed to create generic subscriber: %s" ,
671
679
e.what ());
672
- return ;
680
+ continue ;
673
681
}
674
682
RCLCPP_INFO (get_logger (), " Subscribed ROS topic '%s' of type '%s'" ,
675
683
ros_topic.c_str (), msg_type.c_str ());
0 commit comments