Skip to content

Commit fc739f5

Browse files
authored
Merge pull request #4 from ika-rwth-aachen/improvement/runtime-optimization
Optimize runtime
2 parents df868ec + 9ecb734 commit fc739f5

File tree

2 files changed

+53
-58
lines changed

2 files changed

+53
-58
lines changed

include/mqtt_client/MqttClient.h

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -186,11 +186,11 @@ class MqttClient : public nodelet::Nodelet,
186186
* - serialized timestamp (optional)
187187
* - serialized ROS message
188188
*
189-
* The MQTT payload is expected to carry a serialized ROS message.
190-
*
191-
* @param mqtt_msg MQTT message
189+
* @param mqtt_msg MQTT message
190+
* @param arrival_stamp arrival timestamp used for latency computation
192191
*/
193-
void mqtt2ros(mqtt::const_message_ptr mqtt_msg);
192+
void mqtt2ros(mqtt::const_message_ptr mqtt_msg,
193+
const ros::WallTime& arrival_stamp = ros::WallTime::now());
194194

195195
/**
196196
* @brief Callback for when the client has successfully connected to the

src/MqttClient.cpp

Lines changed: 49 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,9 @@ void MqttClient::loadParameters() {
153153
}
154154
}
155155

156-
NODELET_INFO("Bridging ROS topic '%s' to MQTT topic '%s'",
157-
ros_topic.c_str(), ros2mqtt.mqtt.topic.c_str());
156+
NODELET_INFO("Bridging ROS topic '%s' to MQTT topic '%s' %s",
157+
ros_topic.c_str(), ros2mqtt.mqtt.topic.c_str(),
158+
ros2mqtt.stamped ? "and measuring latency" : "");
158159
} else {
159160
NODELET_WARN(
160161
"Parameter 'bridge/ros2mqtt[%d]' is missing subparameter "
@@ -403,52 +404,46 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
403404
mqtt_topic.c_str(), e.what());
404405
}
405406

406-
// serialize ROS message to buffer
407-
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
408-
std::vector<uint8_t> msg_buffer;
409-
msg_buffer.resize(msg_length);
410-
ros::serialization::OStream msg_stream(msg_buffer.data(), msg_length);
411-
ros::serialization::serialize(msg_stream, *ros_msg);
412-
413407
// build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
414408
// where first item = 1 if timestamp (S) is included
409+
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
415410
uint32_t payload_length = 1 + msg_length;
411+
uint32_t stamp_length = ros::serialization::serializationLength(ros::Time());
416412
uint32_t msg_offset = 1;
417413
std::vector<uint8_t> payload_buffer;
414+
if (ros2mqtt.stamped) {
415+
// allocate buffer with appropriate size to hold [1, S, R]
416+
msg_offset += stamp_length;
417+
payload_length += stamp_length;
418+
payload_buffer.resize(payload_length);
419+
payload_buffer[0] = 1;
420+
} else {
421+
// allocate buffer with appropriate size to hold [0, R]
422+
payload_buffer.resize(payload_length);
423+
payload_buffer[0] = 0;
424+
}
425+
426+
// serialize ROS message to payload [0/1, -, R]
427+
ros::serialization::OStream msg_stream(&payload_buffer[msg_offset],
428+
msg_length);
429+
ros::serialization::serialize(msg_stream, *ros_msg);
430+
431+
// inject timestamp as final step
418432
if (ros2mqtt.stamped) {
419433

420-
// serialize current timestamp
434+
// take current timestamp
421435
ros::WallTime stamp_wall = ros::WallTime::now();
422436
ros::Time stamp(stamp_wall.sec, stamp_wall.nsec);
423437
if (stamp.isZero())
424438
NODELET_WARN(
425439
"Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
426440
"running?",
427441
ros2mqtt.mqtt.topic.c_str());
428-
uint32_t stamp_length = ros::serialization::serializationLength(stamp);
429-
std::vector<uint8_t> stamp_buffer;
430-
stamp_buffer.resize(stamp_length);
431-
ros::serialization::OStream stamp_stream(stamp_buffer.data(), stamp_length);
432-
ros::serialization::serialize(stamp_stream, stamp);
433442

434-
// inject timestamp into payload
435-
payload_length += stamp_length;
436-
msg_offset += stamp_length;
437-
payload_buffer.resize(payload_length);
438-
payload_buffer[0] = 1;
439-
payload_buffer.insert(payload_buffer.begin() + 1,
440-
std::make_move_iterator(stamp_buffer.begin()),
441-
std::make_move_iterator(stamp_buffer.end()));
442-
443-
} else {
444-
445-
payload_buffer.resize(payload_length);
446-
payload_buffer[0] = 0;
443+
// serialize timestamp to payload [1, S, R]
444+
ros::serialization::OStream stamp_stream(&payload_buffer[1], stamp_length);
445+
ros::serialization::serialize(stamp_stream, stamp);
447446
}
448-
// add ROS message to payload
449-
payload_buffer.insert(payload_buffer.begin() + msg_offset,
450-
std::make_move_iterator(msg_buffer.begin()),
451-
std::make_move_iterator(msg_buffer.end()));
452447

453448
// send ROS message to MQTT broker
454449
mqtt_topic = ros2mqtt.mqtt.topic;
@@ -468,7 +463,8 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
468463
}
469464

470465

471-
void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg) {
466+
void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
467+
const ros::WallTime& arrival_stamp) {
472468

473469
std::string mqtt_topic = mqtt_msg->get_topic();
474470
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
@@ -486,20 +482,18 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg) {
486482
// if stamped, compute latency
487483
if (stamped) {
488484

489-
// copy timestamp from MQTT message to buffer
490-
ros::Time stamp;
491-
uint32_t stamp_length = ros::serialization::serializationLength(stamp);
492-
std::vector<uint8_t> stamp_buffer;
493-
stamp_buffer.resize(stamp_length);
494-
std::memcpy(stamp_buffer.data(), &(payload[1]), stamp_length);
485+
// create ROS message buffer on top of MQTT message payload
486+
char* non_const_payload = const_cast<char*>(&payload[1]);
487+
uint8_t* stamp_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
495488

496489
// deserialize stamp
497-
ros::serialization::IStream stamp_stream(stamp_buffer.data(), stamp_length);
490+
ros::Time stamp;
491+
uint32_t stamp_length = ros::serialization::serializationLength(stamp);
492+
ros::serialization::IStream stamp_stream(stamp_buffer, stamp_length);
498493
ros::serialization::deserialize(stamp_stream, stamp);
499494

500495
// compute ROS2MQTT latency
501-
ros::WallTime now_wall = ros::WallTime::now();
502-
ros::Time now(now_wall.sec, now_wall.nsec);
496+
ros::Time now(arrival_stamp.sec, arrival_stamp.nsec);
503497
if (now.isZero())
504498
NODELET_WARN(
505499
"Cannot compute latency for MQTT topic %s when ROS time is 0, is a ROS "
@@ -521,17 +515,16 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg) {
521515
msg_offset += stamp_length;
522516
}
523517

524-
// copy ROS message from MQTT message to buffer
525-
std::vector<uint8_t> msg_buffer;
526-
msg_buffer.resize(msg_length);
527-
std::memcpy(msg_buffer.data(), &(payload[msg_offset]), msg_length);
518+
// create ROS message buffer on top of MQTT message payload
519+
char* non_const_payload = const_cast<char*>(&payload[msg_offset]);
520+
uint8_t* msg_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
521+
ros::serialization::OStream msg_stream(msg_buffer, msg_length);
528522

529523
// publish via ShapeShifter
530524
NODELET_DEBUG(
531525
"Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
532526
mqtt2ros.ros.shape_shifter.getDataType().c_str(),
533527
mqtt2ros.ros.topic.c_str());
534-
ros::serialization::OStream msg_stream(msg_buffer.data(), msg_length);
535528
mqtt2ros.ros.shape_shifter.read(msg_stream);
536529
mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
537530
}
@@ -581,6 +574,9 @@ bool MqttClient::isConnectedService(IsConnected::Request& request,
581574

582575
void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
583576

577+
// instantly take arrival timestamp
578+
ros::WallTime arrival_stamp = ros::WallTime::now();
579+
584580
std::string mqtt_topic = mqtt_msg->get_topic();
585581
NODELET_DEBUG("Received MQTT message on topic '%s'", mqtt_topic.c_str());
586582
auto& payload = mqtt_msg->get_payload_ref();
@@ -590,14 +586,13 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
590586
mqtt_topic.find(kRosMsgTypeMqttTopicPrefix) != std::string::npos;
591587
if (msg_contains_ros_msg_type) {
592588

593-
// copy ROS message type from MQTT message to buffer
594-
RosMsgType ros_msg_type;
595-
std::vector<uint8_t> msg_type_buffer;
596-
msg_type_buffer.resize(payload_length);
597-
std::memcpy(msg_type_buffer.data(), &(payload[0]), payload_length);
589+
// create ROS message buffer on top of MQTT message payload
590+
char* non_const_payload = const_cast<char*>(&payload[0]);
591+
uint8_t* msg_type_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
598592

599593
// deserialize ROS message type
600-
ros::serialization::IStream msg_type_stream(msg_type_buffer.data(),
594+
RosMsgType ros_msg_type;
595+
ros::serialization::IStream msg_type_stream(msg_type_buffer,
601596
payload_length);
602597
ros::serialization::deserialize(msg_type_stream, ros_msg_type);
603598

@@ -632,7 +627,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
632627

633628
// publish ROS message, if publisher initialized
634629
if (!mqtt2ros_[mqtt_topic].ros.publisher.getTopic().empty()) {
635-
mqtt2ros(mqtt_msg);
630+
mqtt2ros(mqtt_msg, arrival_stamp);
636631
} else {
637632
NODELET_WARN(
638633
"ROS publisher for data from MQTT topic '%s' is not yet initialized: "

0 commit comments

Comments
 (0)