Skip to content

Commit a98e5cb

Browse files
committed
move information about timestamp from payload to RosMsgType
1 parent 1182573 commit a98e5cb

File tree

6 files changed

+45
-49
lines changed

6 files changed

+45
-49
lines changed

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -366,15 +366,15 @@ Inside the callback, the generic messages received on the `ros_topic` are serial
366366

367367
Upon retrieval of an MQTT message, it is republished as a ROS message on the ROS network. To this end, [topic_tools::ShapeShifter::morph](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a2b74b522fb49dac05d48f466b6b87d2d) is used to have the ShapeShifter publisher take the shape of the specific ROS message type.
368368

369-
The required metainformation on the ROS message type can however only be extracted in the ROS subscriber callback of the publishing *mqtt_client* with calls to [topic_tools::ShapeShifter::getMD5Sum](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#af05fbf42757658e4c6a0f99ff72f7daa), [topic_tools::ShapeShifter::getDataType](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a9d57b2285213fda5e878ce7ebc42f0fb), and [topic_tools::ShapeShifter::getMessageDefinition](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a503af7234eeba0ccefca64c4d0cc1df0). These attributes are wrapped in a ROS message of custom type [mqtt_client::RosMsgType](msg/RosMsgType.msg), serialized using [ros::serialization](http://wiki.ros.org/roscpp/Overview/MessagesSerializationAndAdaptingTypes) and also shared via the MQTT broker on a special topic.
369+
The required metainformation on the ROS message type can however only be extracted in the ROS subscriber callback of the publishing *mqtt_client* with calls to [topic_tools::ShapeShifter::getMD5Sum](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#af05fbf42757658e4c6a0f99ff72f7daa), [topic_tools::ShapeShifter::getDataType](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a9d57b2285213fda5e878ce7ebc42f0fb), and [topic_tools::ShapeShifter::getMessageDefinition](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a503af7234eeba0ccefca64c4d0cc1df0). These attributes are wrapped in a ROS message of custom type [mqtt_client::RosMsgType](mqtt_client_interfaces/msg/RosMsgType.msg), serialized using [ros::serialization](http://wiki.ros.org/roscpp/Overview/MessagesSerializationAndAdaptingTypes) and also shared via the MQTT broker on a special topic.
370370

371371
When an *mqtt_client* receives such ROS message type metainformation, it configures the corresponding ROS ShapeShifter publisher using [topic_tools::ShapeShifter::morph](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a2b74b522fb49dac05d48f466b6b87d2d).
372372

373-
The *mqtt_client* also provides functionality to measure the latency of transferring a ROS message via an MQTT broker back to ROS. To this end, the sending client injects the current timestamp into the MQTT message. The receiving client can then compute the latency between message reception time and the injected timestamp. Since injection of the timestamp is optional, an extra bit of information is needed for the receiver to correctly decode the MQTT message. Therefore, the first entry in the `std::vector<uint8>` message buffer is used to indicate whether the message includes an injected timestamp. The resulting `std::vector<uint8>` payload takes on one of the following forms:
373+
The *mqtt_client* also provides functionality to measure the latency of transferring a ROS message via an MQTT broker back to ROS. To this end, the sending client injects the current timestamp into the MQTT message. The receiving client can then compute the latency between message reception time and the injected timestamp. The information about whether a timestamp is injected is also included in the custom [mqtt_client::RosMsgType](mqtt_client_interfaces/msg/RosMsgType.msg) message that is sent before. The actual `std::vector<uint8>` message payload takes on one of the following forms:
374374

375375
```txt
376-
[ 1 | ... serialized timestamp ... | ... serialized ROS messsage ...]
377-
[ 0 | ... serialized ROS messsage ...]
376+
[... serialized timestamp ... | ... serialized ROS messsage ...]
377+
[... serialized ROS messsage ...]
378378
```
379379

380380
To summarize, the dataflow is as follows:
@@ -384,12 +384,12 @@ To summarize, the dataflow is as follows:
384384
- ROS message type information is serialized and sent via the MQTT broker on MQTT topic `mqtt_client/ros_msg_type/<ros2mqtt_mqtt_topic>`
385385
- the actual ROS message is serialized
386386
- if `inject_timestamp`, the current timestamp is serialized and concatenated with the message
387-
- an integer is added to the message's head indicating whether a timestamp was injected
388387
- the actual MQTT message is sent via the MQTT broker on MQTT topic `<ros2mqtt_mqtt_topic>`
389388
- an MQTT message containing the ROS message type information is received on MQTT topic `mqtt_client/ros_msg_type/<ros2mqtt_mqtt_topic>`
390389
- message type information is extracted and the ShapeShifter ROS publisher is configured
390+
- information about whether a timestamp is injected is stored for the specific topic
391391
- an MQTT message containing the actual ROS message is received
392-
- depending on the first element of the message, it is decoded into the serialized ROS message and the serialized timestamp
392+
- depending on whether a timestamp is injected, it is decoded into the serialized ROS message and the serialized timestamp
393393
- if the message contained a timestamp, the latency is computed and published on ROS topic `~/latencies/<mqtt2ros_ros_topic>`
394394
- the serialized ROS message is published using the *ShapeShifter* on ROS topic `<mqtt2ros_ros_topic>`
395395

mqtt_client/include/mqtt_client/MqttClient.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -370,6 +370,7 @@ class MqttClient : public nodelet::Nodelet,
370370
} ros; ///< ROS-related variables
371371
bool primitive = false; ///< whether to publish as primitive message (if
372372
///< coming from non-ROS MQTT client)
373+
bool stamped = false; ///< whether timestamp is injected
373374
};
374375

375376
protected:

mqtt_client/include/mqtt_client/MqttClient.ros2.hpp

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -364,6 +364,7 @@ class MqttClient : public rclcpp::Node,
364364
} ros; ///< ROS-related variables
365365
bool primitive = false; ///< whether to publish as primitive message (if
366366
///< coming from non-ROS MQTT client)
367+
bool stamped = false; ///< whether timestamp is injected
367368
};
368369

369370
protected:

mqtt_client/src/MqttClient.cpp

Lines changed: 19 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -490,6 +490,7 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
490490
ros_msg_type.md5 = ros_msg->getMD5Sum();
491491
ros_msg_type.name = ros_msg->getDataType();
492492
ros_msg_type.definition = ros_msg->getMessageDefinition();
493+
ros_msg_type.stamped = ros2mqtt.stamped;
493494

494495
NODELET_DEBUG("Received ROS message of type '%s' on topic '%s'",
495496
ros_msg_type.name.c_str(), ros_topic.c_str());
@@ -534,26 +535,24 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
534535
mqtt_topic.c_str(), e.what());
535536
}
536537

537-
// build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
538-
// where first item = 1 if timestamp (S) is included
538+
// build MQTT payload for ROS message (R) as [R]
539+
// or [S, R] if timestamp (S) is included
539540
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
540-
uint32_t payload_length = 1 + msg_length;
541+
uint32_t payload_length = msg_length;
541542
uint32_t stamp_length =
542543
ros::serialization::serializationLength(ros::Time());
543-
uint32_t msg_offset = 1;
544+
uint32_t msg_offset = 0;
544545
if (ros2mqtt.stamped) {
545-
// allocate buffer with appropriate size to hold [1, S, R]
546+
// allocate buffer with appropriate size to hold [S, R]
546547
msg_offset += stamp_length;
547548
payload_length += stamp_length;
548549
payload_buffer.resize(payload_length);
549-
payload_buffer[0] = 1;
550550
} else {
551-
// allocate buffer with appropriate size to hold [0, R]
551+
// allocate buffer with appropriate size to hold [R]
552552
payload_buffer.resize(payload_length);
553-
payload_buffer[0] = 0;
554553
}
555554

556-
// serialize ROS message to payload [0/1, -, R]
555+
// serialize ROS message to payload [-, R]
557556
ros::serialization::OStream msg_stream(&payload_buffer[msg_offset],
558557
msg_length);
559558
ros::serialization::serialize(msg_stream, *ros_msg);
@@ -570,8 +569,8 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
570569
"running?",
571570
ros2mqtt.mqtt.topic.c_str());
572571

573-
// serialize timestamp to payload [1, S, R]
574-
ros::serialization::OStream stamp_stream(&payload_buffer[1],
572+
// serialize timestamp to payload [S, R]
573+
ros::serialization::OStream stamp_stream(&payload_buffer[0],
575574
stamp_length);
576575
ros::serialization::serialize(stamp_stream, stamp);
577576
}
@@ -603,16 +602,13 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
603602
auto& payload = mqtt_msg->get_payload_ref();
604603
uint32_t payload_length = static_cast<uint32_t>(payload.size());
605604

606-
// determine whether timestamp is injected by reading first element
607-
bool stamped = (static_cast<uint8_t>(payload[0]) > 0);
608-
609-
// read MQTT payload for ROS message (R) as [1, S, R] or [0, R]
610-
// where first item = 1 if timestamp (S) is included
611-
uint32_t msg_length = payload_length - 1;
612-
uint32_t msg_offset = 1;
605+
// read MQTT payload for ROS message (R) as [R]
606+
// or [S, R] if timestamp (S) is included
607+
uint32_t msg_length = payload_length;
608+
uint32_t msg_offset = 0;
613609

614610
// if stamped, compute latency
615-
if (stamped) {
611+
if (mqtt2ros.stamped) {
616612

617613
// create ROS message buffer on top of MQTT message payload
618614
char* non_const_payload = const_cast<char*>(&payload[1]);
@@ -888,6 +884,10 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
888884
// if ROS message type has changed
889885
if (ros_msg_type.md5 != mqtt2ros.ros.shape_shifter.getMD5Sum()) {
890886

887+
mqtt2ros.stamped = ros_msg_type.stamped;
888+
NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
889+
mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
890+
891891
// configure ShapeShifter
892892
mqtt2ros.ros.shape_shifter.morph(ros_msg_type.md5, ros_msg_type.name,
893893
ros_msg_type.definition, "");
@@ -898,9 +898,6 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
898898
node_handle_, mqtt2ros.ros.topic, mqtt2ros.ros.queue_size,
899899
mqtt2ros.ros.latched);
900900

901-
NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
902-
mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
903-
904901
// subscribe to MQTT topic with actual ROS messages
905902
client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);
906903
NODELET_DEBUG("Subscribed MQTT topic '%s'", mqtt_data_topic.c_str());

mqtt_client/src/MqttClient.ros2.cpp

Lines changed: 16 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -523,6 +523,7 @@ void MqttClient::ros2mqtt(
523523
// gather information on ROS message type in special ROS message
524524
mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
525525
ros_msg_type.name = ros2mqtt.ros.msg_type;
526+
ros_msg_type.stamped = ros2mqtt.stamped;
526527

527528
RCLCPP_DEBUG(get_logger(), "Received ROS message of type '%s' on topic '%s'",
528529
ros_msg_type.name.c_str(), ros_topic.c_str());
@@ -570,27 +571,24 @@ void MqttClient::ros2mqtt(
570571
mqtt_topic.c_str(), e.what());
571572
}
572573

573-
// build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
574-
// where first item = 1 if timestamp (S) is included
574+
// build MQTT payload for ROS message (R) as [R]
575+
// or [S, R] if timestamp (S) is included
575576
uint32_t msg_length = serialized_msg->size();
576-
uint32_t payload_length = 1 + msg_length;
577-
578-
uint32_t msg_offset = 1;
577+
uint32_t payload_length = msg_length;
578+
uint32_t msg_offset = 0;
579579
if (ros2mqtt.stamped) {
580-
// allocate buffer with appropriate size to hold [1, S, R]
580+
// allocate buffer with appropriate size to hold [S, R]
581581
msg_offset += stamp_length_;
582582
payload_length += stamp_length_;
583583
payload_buffer.resize(payload_length);
584-
payload_buffer[0] = 1;
585584
} else {
586-
// allocate buffer with appropriate size to hold [0, R]
585+
// allocate buffer with appropriate size to hold [R]
587586
payload_buffer.resize(payload_length);
588-
payload_buffer[0] = 0;
589587
}
590588

591589
// TODO: if not stamped, could create payload_buffer directly on top of
592590
// serialized_msg->get_rcl_serialized_message().buffer copy serialized ROS
593-
// message to payload [0/1, -, R]
591+
// message to payload [-, R]
594592
std::copy(serialized_msg->get_rcl_serialized_message().buffer,
595593
serialized_msg->get_rcl_serialized_message().buffer + msg_length,
596594
payload_buffer.begin() + msg_offset);
@@ -601,13 +599,13 @@ void MqttClient::ros2mqtt(
601599
// take current timestamp
602600
builtin_interfaces::msg::Time stamp(rclcpp::Clock(RCL_SYSTEM_TIME).now());
603601

604-
// copy serialized timestamp to payload [1, S, R]
602+
// copy serialized timestamp to payload [S, R]
605603
rclcpp::SerializedMessage serialized_stamp;
606604
serializeRosMessage(stamp, serialized_stamp);
607605
std::copy(
608606
serialized_stamp.get_rcl_serialized_message().buffer,
609607
serialized_stamp.get_rcl_serialized_message().buffer + stamp_length_,
610-
payload_buffer.begin() + 1);
608+
payload_buffer.begin());
611609
}
612610
}
613611

@@ -639,16 +637,13 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
639637
auto& payload = mqtt_msg->get_payload_ref();
640638
uint32_t payload_length = static_cast<uint32_t>(payload.size());
641639

642-
// determine whether timestamp is injected by reading first element
643-
bool stamped = (static_cast<uint8_t>(payload[0]) > 0);
644-
645-
// read MQTT payload for ROS message (R) as [1, S, R] or [0, R]
646-
// where first item = 1 if timestamp (S) is included
647-
uint32_t msg_length = payload_length - 1;
648-
uint32_t msg_offset = 1;
640+
// read MQTT payload for ROS message (R) as [R]
641+
// or [S, R] if timestamp (S) is included
642+
uint32_t msg_length = payload_length;
643+
uint32_t msg_offset = 0;
649644

650645
// if stamped, compute latency
651-
if (stamped) {
646+
if (mqtt2ros.stamped) {
652647

653648
// copy stamp to generic message buffer
654649
rclcpp::SerializedMessage serialized_stamp(stamp_length_);
@@ -890,6 +885,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
890885
if (ros_msg_type.name != mqtt2ros.ros.msg_type) {
891886

892887
mqtt2ros.ros.msg_type = ros_msg_type.name;
888+
mqtt2ros.stamped = ros_msg_type.stamped;
893889
RCLCPP_INFO(get_logger(),
894890
"ROS publisher message type on topic '%s' set to '%s'",
895891
mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());

mqtt_client_interfaces/msg/RosMsgType.msg

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,5 @@
22

33
string md5 # MD5 sum of ROS message type
44
string name # ROS message type name (e.g. 'std_msgs/Float32')
5-
string definition # ROS message type definition (e.g. 'float data')
5+
string definition # ROS message type definition (e.g. 'float data')
6+
bool stamped # whether timestamp is prepended for latency computation

0 commit comments

Comments
 (0)