Skip to content

Commit 1a80753

Browse files
committed
allow to publish primitive mqtt messages
1 parent 19eeb76 commit 1a80753

File tree

3 files changed

+185
-65
lines changed

3 files changed

+185
-65
lines changed

README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,7 @@ bridge:
193193
ros2mqtt: # array specifying which ROS topics to map to which MQTT topics
194194
- ros_topic: # ROS topic whose messages are transformed to MQTT messages
195195
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
196+
primitive: # [false] whether to publish as primitive message
196197
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
197198
advanced:
198199
ros:

include/mqtt_client/MqttClient.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -348,6 +348,7 @@ class MqttClient : public nodelet::Nodelet,
348348
int qos = 0; ///< MQTT QoS value
349349
bool retained = false; ///< whether to retain MQTT message
350350
} mqtt; ///< MQTT-related variables
351+
bool primitive = false; ///< whether to publish as primitive message
351352
bool stamped = false; ///< whether to inject timestamp in MQTT message
352353
};
353354

src/MqttClient.cpp

Lines changed: 183 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,18 @@ SOFTWARE.
3535
#include <pluginlib/class_list_macros.h>
3636
#include <ros/message_traits.h>
3737
#include <std_msgs/Bool.h>
38+
#include <std_msgs/Char.h>
3839
#include <std_msgs/Float32.h>
3940
#include <std_msgs/Float64.h>
41+
#include <std_msgs/Int16.h>
4042
#include <std_msgs/Int32.h>
43+
#include <std_msgs/Int64.h>
44+
#include <std_msgs/Int8.h>
4145
#include <std_msgs/String.h>
46+
#include <std_msgs/UInt16.h>
47+
#include <std_msgs/UInt32.h>
48+
#include <std_msgs/UInt64.h>
49+
#include <std_msgs/UInt8.h>
4250
#include <xmlrpcpp/XmlRpcException.h>
4351
#include <xmlrpcpp/XmlRpcValue.h>
4452

@@ -55,6 +63,87 @@ const std::string MqttClient::kRosMsgTypeMqttTopicPrefix =
5563
const std::string MqttClient::kLatencyRosTopicPrefix = "latencies/";
5664

5765

66+
/**
67+
* @brief Extracts string of primitive data types from ROS message.
68+
*
69+
* This is helpful to extract the actual data payload of a primitive ROS
70+
* message. If e.g. an std_msgs/String is serialized to a string
71+
* representation, it also contains the field name 'data'. This function
72+
* instead returns the underlying value as string.
73+
*
74+
* The following primitive ROS message types are supported:
75+
* std_msgs/String
76+
* std_msgs/Bool
77+
* std_msgs/Char
78+
* std_msgs/UInt8
79+
* std_msgs/UInt16
80+
* std_msgs/UInt32
81+
* std_msgs/UInt64
82+
* std_msgs/Int8
83+
* std_msgs/Int16
84+
* std_msgs/Int32
85+
* std_msgs/Int64
86+
* std_msgs/Float32
87+
* std_msgs/Float64
88+
*
89+
* @param [in] msg generic ShapeShifter ROS message
90+
* @param [out] primitive string representation of primitive message data
91+
*
92+
* @return true if primitive ROS message type was found
93+
* @return false if ROS message type is not primitive
94+
*/
95+
bool primitiveRosMessageToString(const topic_tools::ShapeShifter::ConstPtr& msg,
96+
std::string& primitive) {
97+
98+
bool found_primitive = true;
99+
const std::string& msg_type_md5 = msg->getMD5Sum();
100+
101+
if (msg_type_md5 == ros::message_traits::MD5Sum<std_msgs::String>::value()) {
102+
primitive = msg->instantiate<std_msgs::String>()->data;
103+
} else if (msg_type_md5 ==
104+
ros::message_traits::MD5Sum<std_msgs::Bool>::value()) {
105+
primitive = msg->instantiate<std_msgs::Bool>()->data ? "true" : "false";
106+
} else if (msg_type_md5 ==
107+
ros::message_traits::MD5Sum<std_msgs::Char>::value()) {
108+
primitive = std::to_string(msg->instantiate<std_msgs::Char>()->data);
109+
} else if (msg_type_md5 ==
110+
ros::message_traits::MD5Sum<std_msgs::UInt8>::value()) {
111+
primitive = std::to_string(msg->instantiate<std_msgs::UInt8>()->data);
112+
} else if (msg_type_md5 ==
113+
ros::message_traits::MD5Sum<std_msgs::UInt16>::value()) {
114+
primitive = std::to_string(msg->instantiate<std_msgs::UInt16>()->data);
115+
} else if (msg_type_md5 ==
116+
ros::message_traits::MD5Sum<std_msgs::UInt32>::value()) {
117+
primitive = std::to_string(msg->instantiate<std_msgs::UInt32>()->data);
118+
} else if (msg_type_md5 ==
119+
ros::message_traits::MD5Sum<std_msgs::UInt64>::value()) {
120+
primitive = std::to_string(msg->instantiate<std_msgs::UInt64>()->data);
121+
} else if (msg_type_md5 ==
122+
ros::message_traits::MD5Sum<std_msgs::Int8>::value()) {
123+
primitive = std::to_string(msg->instantiate<std_msgs::Int8>()->data);
124+
} else if (msg_type_md5 ==
125+
ros::message_traits::MD5Sum<std_msgs::Int16>::value()) {
126+
primitive = std::to_string(msg->instantiate<std_msgs::Int16>()->data);
127+
} else if (msg_type_md5 ==
128+
ros::message_traits::MD5Sum<std_msgs::Int32>::value()) {
129+
primitive = std::to_string(msg->instantiate<std_msgs::Int32>()->data);
130+
} else if (msg_type_md5 ==
131+
ros::message_traits::MD5Sum<std_msgs::Int64>::value()) {
132+
primitive = std::to_string(msg->instantiate<std_msgs::Int64>()->data);
133+
} else if (msg_type_md5 ==
134+
ros::message_traits::MD5Sum<std_msgs::Float32>::value()) {
135+
primitive = std::to_string(msg->instantiate<std_msgs::Float32>()->data);
136+
} else if (msg_type_md5 ==
137+
ros::message_traits::MD5Sum<std_msgs::Float64>::value()) {
138+
primitive = std::to_string(msg->instantiate<std_msgs::Float64>()->data);
139+
} else {
140+
found_primitive = false;
141+
}
142+
143+
return found_primitive;
144+
}
145+
146+
58147
void MqttClient::onInit() {
59148

60149
// get nodehandles
@@ -139,9 +228,20 @@ void MqttClient::loadParameters() {
139228
Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
140229
ros2mqtt.mqtt.topic = std::string(ros2mqtt_params[k]["mqtt_topic"]);
141230

231+
// ros2mqtt[k]/primitive
232+
if (ros2mqtt_params[k].hasMember("primitive"))
233+
ros2mqtt.primitive = ros2mqtt_params[k]["primitive"];
234+
142235
// ros2mqtt[k]/inject_timestamp
143236
if (ros2mqtt_params[k].hasMember("inject_timestamp"))
144237
ros2mqtt.stamped = ros2mqtt_params[k]["inject_timestamp"];
238+
if (ros2mqtt.stamped && ros2mqtt.primitive) {
239+
NODELET_WARN(
240+
"Timestamp will not be injected into primitive messages on ROS "
241+
"topic '%s'",
242+
ros_topic.c_str());
243+
ros2mqtt.stamped = false;
244+
}
145245

146246
// ros2mqtt[k]/advanced
147247
if (ros2mqtt_params[k].hasMember("advanced")) {
@@ -159,7 +259,8 @@ void MqttClient::loadParameters() {
159259
}
160260
}
161261

162-
NODELET_INFO("Bridging ROS topic '%s' to MQTT topic '%s' %s",
262+
NODELET_INFO("Bridging %sROS topic '%s' to MQTT topic '%s' %s",
263+
ros2mqtt.primitive ? "primitive " : "",
163264
ros_topic.c_str(), ros2mqtt.mqtt.topic.c_str(),
164265
ros2mqtt.stamped ? "and measuring latency" : "");
165266
} else {
@@ -381,6 +482,8 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
381482
const std::string& ros_topic) {
382483

383484
Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
485+
std::string mqtt_topic = ros2mqtt.mqtt.topic;
486+
std::vector<uint8_t> payload_buffer;
384487

385488
// gather information on ROS message type in special ROS message
386489
RosMsgType ros_msg_type;
@@ -391,69 +494,84 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
391494
NODELET_DEBUG("Received ROS message of type '%s' on topic '%s'",
392495
ros_msg_type.name.c_str(), ros_topic.c_str());
393496

394-
// serialize ROS message type to buffer
395-
uint32_t msg_type_length =
396-
ros::serialization::serializationLength(ros_msg_type);
397-
std::vector<uint8_t> msg_type_buffer;
398-
msg_type_buffer.resize(msg_type_length);
399-
ros::serialization::OStream msg_type_stream(msg_type_buffer.data(),
400-
msg_type_length);
401-
ros::serialization::serialize(msg_type_stream, ros_msg_type);
402-
403-
// send ROS message type information to MQTT broker
404-
std::string mqtt_topic = kRosMsgTypeMqttTopicPrefix + ros2mqtt.mqtt.topic;
405-
try {
406-
NODELET_DEBUG("Sending ROS message type to MQTT broker on topic '%s' ...",
407-
mqtt_topic.c_str());
408-
mqtt::message_ptr mqtt_msg =
409-
mqtt::make_message(mqtt_topic, msg_type_buffer.data(), msg_type_length,
410-
ros2mqtt.mqtt.qos, true);
411-
client_->publish(mqtt_msg);
412-
} catch (const mqtt::exception& e) {
413-
NODELET_WARN(
414-
"Publishing ROS message type information to MQTT topic '%s' failed: %s",
415-
mqtt_topic.c_str(), e.what());
416-
}
417-
418-
// build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
419-
// where first item = 1 if timestamp (S) is included
420-
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
421-
uint32_t payload_length = 1 + msg_length;
422-
uint32_t stamp_length = ros::serialization::serializationLength(ros::Time());
423-
uint32_t msg_offset = 1;
424-
std::vector<uint8_t> payload_buffer;
425-
if (ros2mqtt.stamped) {
426-
// allocate buffer with appropriate size to hold [1, S, R]
427-
msg_offset += stamp_length;
428-
payload_length += stamp_length;
429-
payload_buffer.resize(payload_length);
430-
payload_buffer[0] = 1;
431-
} else {
432-
// allocate buffer with appropriate size to hold [0, R]
433-
payload_buffer.resize(payload_length);
434-
payload_buffer[0] = 0;
435-
}
436-
437-
// serialize ROS message to payload [0/1, -, R]
438-
ros::serialization::OStream msg_stream(&payload_buffer[msg_offset],
439-
msg_length);
440-
ros::serialization::serialize(msg_stream, *ros_msg);
441-
442-
// inject timestamp as final step
443-
if (ros2mqtt.stamped) {
444-
445-
// take current timestamp
446-
ros::WallTime stamp_wall = ros::WallTime::now();
447-
ros::Time stamp(stamp_wall.sec, stamp_wall.nsec);
448-
if (stamp.isZero())
497+
if (ros2mqtt.primitive) { // publish as primitive (string) message
498+
499+
// resolve ROS messages to primitive strings if possible, else serialize
500+
// entire message to string
501+
std::string payload;
502+
bool found_primitive = primitiveRosMessageToString(ros_msg, payload);
503+
if (found_primitive)
504+
payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
505+
else
506+
serializeRosMessage(*ros_msg, payload_buffer);
507+
508+
} else { // publish as complete ROS message incl. ROS message type
509+
510+
// serialize ROS message type to buffer
511+
uint32_t msg_type_length =
512+
ros::serialization::serializationLength(ros_msg_type);
513+
std::vector<uint8_t> msg_type_buffer;
514+
msg_type_buffer.resize(msg_type_length);
515+
ros::serialization::OStream msg_type_stream(msg_type_buffer.data(),
516+
msg_type_length);
517+
ros::serialization::serialize(msg_type_stream, ros_msg_type);
518+
519+
// send ROS message type information to MQTT broker
520+
mqtt_topic = kRosMsgTypeMqttTopicPrefix + ros2mqtt.mqtt.topic;
521+
try {
522+
NODELET_DEBUG("Sending ROS message type to MQTT broker on topic '%s' ...",
523+
mqtt_topic.c_str());
524+
mqtt::message_ptr mqtt_msg =
525+
mqtt::make_message(mqtt_topic, msg_type_buffer.data(),
526+
msg_type_buffer.size(), ros2mqtt.mqtt.qos, true);
527+
client_->publish(mqtt_msg);
528+
} catch (const mqtt::exception& e) {
449529
NODELET_WARN(
450-
"Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
451-
"running?",
452-
ros2mqtt.mqtt.topic.c_str());
530+
"Publishing ROS message type information to MQTT topic '%s' failed: %s",
531+
mqtt_topic.c_str(), e.what());
532+
}
453533

454-
// serialize timestamp to payload [1, S, R]
455-
ros::serialization::OStream stamp_stream(&payload_buffer[1], stamp_length);
456-
ros::serialization::serialize(stamp_stream, stamp);
534+
// build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
535+
// where first item = 1 if timestamp (S) is included
536+
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
537+
uint32_t payload_length = 1 + msg_length;
538+
uint32_t stamp_length =
539+
ros::serialization::serializationLength(ros::Time());
540+
uint32_t msg_offset = 1;
541+
if (ros2mqtt.stamped) {
542+
// allocate buffer with appropriate size to hold [1, S, R]
543+
msg_offset += stamp_length;
544+
payload_length += stamp_length;
545+
payload_buffer.resize(payload_length);
546+
payload_buffer[0] = 1;
547+
} else {
548+
// allocate buffer with appropriate size to hold [0, R]
549+
payload_buffer.resize(payload_length);
550+
payload_buffer[0] = 0;
551+
}
552+
553+
// serialize ROS message to payload [0/1, -, R]
554+
ros::serialization::OStream msg_stream(&payload_buffer[msg_offset],
555+
msg_length);
556+
ros::serialization::serialize(msg_stream, *ros_msg);
557+
558+
// inject timestamp as final step
559+
if (ros2mqtt.stamped) {
560+
561+
// take current timestamp
562+
ros::WallTime stamp_wall = ros::WallTime::now();
563+
ros::Time stamp(stamp_wall.sec, stamp_wall.nsec);
564+
if (stamp.isZero())
565+
NODELET_WARN(
566+
"Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
567+
"running?",
568+
ros2mqtt.mqtt.topic.c_str());
569+
570+
// serialize timestamp to payload [1, S, R]
571+
ros::serialization::OStream stamp_stream(&payload_buffer[1],
572+
stamp_length);
573+
ros::serialization::serialize(stamp_stream, stamp);
574+
}
457575
}
458576

459577
// send ROS message to MQTT broker
@@ -462,9 +580,9 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
462580
NODELET_DEBUG(
463581
"Sending ROS message of type '%s' to MQTT broker on topic '%s' ...",
464582
ros_msg->getDataType().c_str(), mqtt_topic.c_str());
465-
mqtt::message_ptr mqtt_msg =
466-
mqtt::make_message(mqtt_topic, payload_buffer.data(), payload_length,
467-
ros2mqtt.mqtt.qos, ros2mqtt.mqtt.retained);
583+
mqtt::message_ptr mqtt_msg = mqtt::make_message(
584+
mqtt_topic, payload_buffer.data(), payload_buffer.size(),
585+
ros2mqtt.mqtt.qos, ros2mqtt.mqtt.retained);
468586
client_->publish(mqtt_msg);
469587
} catch (const mqtt::exception& e) {
470588
NODELET_WARN(

0 commit comments

Comments
 (0)