@@ -31,6 +31,7 @@ SOFTWARE.
31
31
#include < vector>
32
32
33
33
#include < mqtt_client/MqttClient.ros2.hpp>
34
+ #include < mqtt_client_interfaces/msg/ros_msg_type.hpp>
34
35
#include < rcpputils/env.hpp>
35
36
#include < std_msgs/msg/bool.hpp>
36
37
#include < std_msgs/msg/char.hpp>
@@ -516,40 +517,51 @@ void MqttClient::ros2mqtt(
516
517
const std::string& ros_topic) {
517
518
518
519
Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
519
- std::string& ros_msg_type = ros2mqtt.ros .msg_type ;
520
520
std::string mqtt_topic = ros2mqtt.mqtt .topic ;
521
521
std::vector<uint8_t > payload_buffer;
522
522
523
+ // gather information on ROS message type in special ROS message
524
+ mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
525
+ ros_msg_type.name = ros2mqtt.ros .msg_type ;
526
+
523
527
RCLCPP_DEBUG (get_logger (), " Received ROS message of type '%s' on topic '%s'" ,
524
- ros_msg_type.c_str (), ros_topic.c_str ());
528
+ ros_msg_type.name . c_str (), ros_topic.c_str ());
525
529
526
530
if (ros2mqtt.primitive ) { // publish as primitive (string) message
527
531
528
532
// resolve ROS messages to primitive strings if possible
529
533
std::string payload;
530
534
bool found_primitive =
531
- primitiveRosMessageToString (serialized_msg, ros_msg_type, payload);
535
+ primitiveRosMessageToString (serialized_msg, ros_msg_type. name , payload);
532
536
if (found_primitive) {
533
537
payload_buffer = std::vector<uint8_t >(payload.begin (), payload.end ());
534
538
} else {
535
539
RCLCPP_WARN (get_logger (),
536
540
" Cannot send ROS message of type '%s' as primitive message, "
537
541
" check supported primitive types" ,
538
- ros_msg_type.c_str ());
542
+ ros_msg_type.name . c_str ());
539
543
return ;
540
544
}
541
545
542
546
} else { // publish as complete ROS message incl. ROS message type
543
547
548
+ // serialize ROS message type
549
+ rclcpp::SerializedMessage serialized_ros_msg_type;
550
+ serializeRosMessage (ros_msg_type, serialized_ros_msg_type);
551
+ uint32_t msg_type_length = serialized_ros_msg_type.size ();
552
+ std::vector<uint8_t > msg_type_buffer = std::vector<uint8_t >(
553
+ serialized_ros_msg_type.get_rcl_serialized_message ().buffer ,
554
+ serialized_ros_msg_type.get_rcl_serialized_message ().buffer + msg_type_length);
555
+
544
556
// send ROS message type information to MQTT broker
545
557
mqtt_topic = kRosMsgTypeMqttTopicPrefix + ros2mqtt.mqtt .topic ;
546
558
try {
547
559
RCLCPP_DEBUG (get_logger (),
548
560
" Sending ROS message type to MQTT broker on topic '%s' ..." ,
549
561
mqtt_topic.c_str ());
550
562
mqtt::message_ptr mqtt_msg =
551
- mqtt::make_message (mqtt_topic, ros_msg_type. c_str (),
552
- ros_msg_type .size (), ros2mqtt.mqtt .qos , true );
563
+ mqtt::make_message (mqtt_topic, msg_type_buffer. data (),
564
+ msg_type_buffer .size (), ros2mqtt.mqtt .qos , true );
553
565
client_->publish (mqtt_msg);
554
566
} catch (const mqtt::exception& e) {
555
567
RCLCPP_WARN (
@@ -605,7 +617,7 @@ void MqttClient::ros2mqtt(
605
617
RCLCPP_DEBUG (
606
618
get_logger (),
607
619
" Sending ROS message of type '%s' to MQTT broker on topic '%s' ..." ,
608
- ros_msg_type.c_str (), mqtt_topic.c_str ());
620
+ ros_msg_type.name . c_str (), mqtt_topic.c_str ());
609
621
mqtt::message_ptr mqtt_msg = mqtt::make_message (
610
622
mqtt_topic, payload_buffer.data (), payload_buffer.size (),
611
623
ros2mqtt.mqtt .qos , ros2mqtt.mqtt .retained );
@@ -638,10 +650,6 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
638
650
// if stamped, compute latency
639
651
if (stamped) {
640
652
641
- // create ROS message buffer on top of MQTT message payload
642
- char * non_const_payload = const_cast <char *>(&payload[1 ]);
643
- uint8_t * stamp_buffer = reinterpret_cast <uint8_t *>(non_const_payload);
644
-
645
653
// copy stamp to generic message buffer
646
654
rclcpp::SerializedMessage serialized_stamp (stamp_length_);
647
655
std::memcpy (serialized_stamp.get_rcl_serialized_message ().buffer ,
@@ -860,7 +868,17 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
860
868
mqtt_topic.find (kRosMsgTypeMqttTopicPrefix ) != std::string::npos;
861
869
if (msg_contains_ros_msg_type) {
862
870
863
- std::string ros_msg_type = mqtt_msg->to_string ();
871
+ // copy message type to generic message buffer
872
+ auto & payload = mqtt_msg->get_payload_ref ();
873
+ uint32_t payload_length = static_cast <uint32_t >(payload.size ());
874
+ rclcpp::SerializedMessage serialized_ros_msg_type (payload_length);
875
+ std::memcpy (serialized_ros_msg_type.get_rcl_serialized_message ().buffer ,
876
+ &(payload[0 ]), payload_length);
877
+ serialized_ros_msg_type.get_rcl_serialized_message ().buffer_length = payload_length;
878
+
879
+ // deserialize ROS message type
880
+ mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
881
+ deserializeRosMessage (serialized_ros_msg_type, ros_msg_type);
864
882
865
883
// reconstruct corresponding MQTT data topic
866
884
std::string mqtt_data_topic = mqtt_topic;
@@ -869,17 +887,17 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
869
887
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_data_topic];
870
888
871
889
// if ROS message type has changed
872
- if (ros_msg_type != mqtt2ros.ros .msg_type ) {
890
+ if (ros_msg_type. name != mqtt2ros.ros .msg_type ) {
873
891
874
- mqtt2ros.ros .msg_type = ros_msg_type;
892
+ mqtt2ros.ros .msg_type = ros_msg_type. name ;
875
893
RCLCPP_INFO (get_logger (),
876
894
" ROS publisher message type on topic '%s' set to '%s'" ,
877
- mqtt2ros.ros .topic .c_str (), ros_msg_type.c_str ());
895
+ mqtt2ros.ros .topic .c_str (), ros_msg_type.name . c_str ());
878
896
879
897
// recreate generic publisher
880
898
try {
881
899
mqtt2ros.ros .publisher = create_generic_publisher (
882
- mqtt2ros.ros .topic , ros_msg_type, mqtt2ros.ros .queue_size );
900
+ mqtt2ros.ros .topic , ros_msg_type. name , mqtt2ros.ros .queue_size );
883
901
} catch (rclcpp::exceptions::RCLError& e) {
884
902
RCLCPP_ERROR (get_logger (), " Failed to create generic publisher: %s" ,
885
903
e.what ());
0 commit comments