Skip to content

Commit 2664022

Browse files
committed
publish primitive messages via ShapeShifter
1 parent 57a134a commit 2664022

File tree

2 files changed

+126
-6
lines changed

2 files changed

+126
-6
lines changed

include/mqtt_client/MqttClient.h

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,24 @@ class MqttClient : public nodelet::Nodelet,
192192
void mqtt2ros(mqtt::const_message_ptr mqtt_msg,
193193
const ros::WallTime& arrival_stamp = ros::WallTime::now());
194194

195+
/**
196+
* @brief Publishes a primitive message received via MQTT to ROS.
197+
*
198+
* This tries to interpret the raw MQTT message as a bool, int, or float value
199+
* in the given order before falling back to string. The message is then
200+
* published as a corresponding primitive ROS message. This utilizes the
201+
* ShapeShifter stored for the MQTT topic on which the message was received.
202+
* The ShapeShifter is dynamically configured to the appropriate ROS message
203+
* type.
204+
*
205+
* The following mappings from primitive type to ROS message type hold:
206+
* bool: std_msgs/Bool
207+
* int: std_msgs/Int32
208+
* float: std_msgs/Float32
209+
* string: std_msgs/String
210+
*
211+
* @param mqtt_msg MQTT message
212+
*/
195213
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);
196214

197215
/**
@@ -443,4 +461,22 @@ bool MqttClient::loadParameter(const std::string& key, T& value,
443461
return found;
444462
}
445463

464+
465+
/**
466+
* Serializes a ROS message to a buffer.
467+
*
468+
* @tparam T ROS message type
469+
*
470+
* @param[in] msg ROS message
471+
* @param[out] buffer buffer to serialize to
472+
*/
473+
template <typename T>
474+
void serializeRosMessage(const T& msg, std::vector<uint8_t>& buffer) {
475+
476+
const uint32_t length = ros::serialization::serializationLength(msg);
477+
buffer.resize(length);
478+
ros::serialization::OStream stream(buffer.data(), length);
479+
ros::serialization::serialize(stream, msg);
480+
}
481+
446482
} // namespace mqtt_client

src/MqttClient.cpp

Lines changed: 90 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,12 @@ SOFTWARE.
3333
#include <mqtt_client/MqttClient.h>
3434
#include <mqtt_client/RosMsgType.h>
3535
#include <pluginlib/class_list_macros.h>
36+
#include <ros/message_traits.h>
37+
#include <std_msgs/Bool.h>
38+
#include <std_msgs/Float32.h>
3639
#include <std_msgs/Float64.h>
40+
#include <std_msgs/Int32.h>
41+
#include <std_msgs/String.h>
3742
#include <xmlrpcpp/XmlRpcException.h>
3843
#include <xmlrpcpp/XmlRpcValue.h>
3944

@@ -537,18 +542,37 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
537542

538543
void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
539544

540-
bool found_primitive = false;
545+
std::string mqtt_topic = mqtt_msg->get_topic();
546+
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
541547
const std::string str_msg = mqtt_msg->to_string();
542548

549+
bool found_primitive = false;
550+
std::string msg_type_md5;
551+
std::string msg_type_name;
552+
std::string msg_type_definition;
553+
std::vector<uint8_t> msg_buffer;
554+
543555
// check for bool
544556
if (!found_primitive) {
545557
std::string bool_str = str_msg;
546558
std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(),
547559
::tolower);
548560
if (bool_str == "true" || bool_str == "false") {
549-
found_primitive = true;
561+
550562
bool bool_msg = (bool_str == "true");
551-
NODELET_INFO("Got bool: %d", bool_msg);
563+
564+
// construct and serialize ROS message
565+
std_msgs::Bool msg;
566+
msg.data = bool_msg;
567+
serializeRosMessage(msg, msg_buffer);
568+
569+
// collect ROS message type information
570+
msg_type_md5 = ros::message_traits::MD5Sum<std_msgs::Bool>::value();
571+
msg_type_name = ros::message_traits::DataType<std_msgs::Bool>::value();
572+
msg_type_definition =
573+
ros::message_traits::Definition<std_msgs::Bool>::value();
574+
575+
found_primitive = true;
552576
}
553577
}
554578

@@ -558,8 +582,19 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
558582
try {
559583
const int int_msg = std::stoi(str_msg, &pos);
560584
if (pos == str_msg.size()) {
585+
586+
// construct and serialize ROS message
587+
std_msgs::Int32 msg;
588+
msg.data = int_msg;
589+
serializeRosMessage(msg, msg_buffer);
590+
591+
// collect ROS message type information
592+
msg_type_md5 = ros::message_traits::MD5Sum<std_msgs::Int32>::value();
593+
msg_type_name = ros::message_traits::DataType<std_msgs::Int32>::value();
594+
msg_type_definition =
595+
ros::message_traits::Definition<std_msgs::Int32>::value();
596+
561597
found_primitive = true;
562-
NODELET_INFO("Got int: %d", int_msg);
563598
}
564599
} catch (const std::invalid_argument& ex) {
565600
} catch (const std::out_of_range& ex) {
@@ -572,17 +607,66 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
572607
try {
573608
const float float_msg = std::stof(str_msg, &pos);
574609
if (pos == str_msg.size()) {
610+
611+
// construct and serialize ROS message
612+
std_msgs::Float32 msg;
613+
msg.data = float_msg;
614+
serializeRosMessage(msg, msg_buffer);
615+
616+
// collect ROS message type information
617+
msg_type_md5 = ros::message_traits::MD5Sum<std_msgs::Float32>::value();
618+
msg_type_name =
619+
ros::message_traits::DataType<std_msgs::Float32>::value();
620+
msg_type_definition =
621+
ros::message_traits::Definition<std_msgs::Float32>::value();
622+
575623
found_primitive = true;
576-
NODELET_INFO("Got float: %f", float_msg);
577624
}
578625
} catch (const std::invalid_argument& ex) {
579626
} catch (const std::out_of_range& ex) {
580627
}
581628
}
582629

630+
// fall back to string
583631
if (!found_primitive) {
584-
NODELET_INFO("Got string: %s", str_msg.c_str());
632+
633+
// construct and serialize ROS message
634+
std_msgs::String msg;
635+
msg.data = str_msg;
636+
serializeRosMessage(msg, msg_buffer);
637+
638+
// collect ROS message type information
639+
msg_type_md5 = ros::message_traits::MD5Sum<std_msgs::String>::value();
640+
msg_type_name = ros::message_traits::DataType<std_msgs::String>::value();
641+
msg_type_definition =
642+
ros::message_traits::Definition<std_msgs::String>::value();
643+
}
644+
645+
// if ROS message type has changed
646+
if (msg_type_md5 != mqtt2ros.ros.shape_shifter.getMD5Sum()) {
647+
648+
// configure ShapeShifter
649+
mqtt2ros.ros.shape_shifter.morph(msg_type_md5, msg_type_name,
650+
msg_type_definition, "");
651+
652+
// advertise with ROS publisher
653+
mqtt2ros.ros.publisher.shutdown();
654+
mqtt2ros.ros.publisher = mqtt2ros.ros.shape_shifter.advertise(
655+
node_handle_, mqtt2ros.ros.topic, mqtt2ros.ros.queue_size,
656+
mqtt2ros.ros.latched);
657+
658+
NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
659+
mqtt2ros.ros.topic.c_str(), msg_type_name.c_str());
585660
}
661+
662+
// publish via ShapeShifter
663+
ros::serialization::OStream msg_stream(msg_buffer.data(), msg_buffer.size());
664+
NODELET_DEBUG(
665+
"Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
666+
mqtt2ros.ros.shape_shifter.getDataType().c_str(),
667+
mqtt2ros.ros.topic.c_str());
668+
mqtt2ros.ros.shape_shifter.read(msg_stream);
669+
mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
586670
}
587671

588672

0 commit comments

Comments
 (0)