Skip to content

Commit 0693ab7

Browse files
committed
optimize serialization to MQTT payload
1 parent 80a2a98 commit 0693ab7

File tree

1 file changed

+24
-30
lines changed

1 file changed

+24
-30
lines changed

src/MqttClient.cpp

Lines changed: 24 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -403,52 +403,46 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
403403
mqtt_topic.c_str(), e.what());
404404
}
405405

406-
// serialize ROS message to buffer
407-
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
408-
std::vector<uint8_t> msg_buffer;
409-
msg_buffer.resize(msg_length);
410-
ros::serialization::OStream msg_stream(msg_buffer.data(), msg_length);
411-
ros::serialization::serialize(msg_stream, *ros_msg);
412-
413406
// build MQTT payload for ROS message (R) as [1, S, R] or [0, R]
414407
// where first item = 1 if timestamp (S) is included
408+
uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
415409
uint32_t payload_length = 1 + msg_length;
410+
uint32_t stamp_length = ros::serialization::serializationLength(ros::Time());
416411
uint32_t msg_offset = 1;
417412
std::vector<uint8_t> payload_buffer;
413+
if (ros2mqtt.stamped) {
414+
// allocate buffer with appropriate size to hold [1, S, R]
415+
msg_offset += stamp_length;
416+
payload_length += stamp_length;
417+
payload_buffer.resize(payload_length);
418+
payload_buffer[0] = 1;
419+
} else {
420+
// allocate buffer with appropriate size to hold [0, R]
421+
payload_buffer.resize(payload_length);
422+
payload_buffer[0] = 0;
423+
}
424+
425+
// serialize ROS message to payload [0/1, -, R]
426+
ros::serialization::OStream msg_stream(&payload_buffer[msg_offset],
427+
msg_length);
428+
ros::serialization::serialize(msg_stream, *ros_msg);
429+
430+
// inject timestamp as final step
418431
if (ros2mqtt.stamped) {
419432

420-
// serialize current timestamp
433+
// take current timestamp
421434
ros::WallTime stamp_wall = ros::WallTime::now();
422435
ros::Time stamp(stamp_wall.sec, stamp_wall.nsec);
423436
if (stamp.isZero())
424437
NODELET_WARN(
425438
"Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
426439
"running?",
427440
ros2mqtt.mqtt.topic.c_str());
428-
uint32_t stamp_length = ros::serialization::serializationLength(stamp);
429-
std::vector<uint8_t> stamp_buffer;
430-
stamp_buffer.resize(stamp_length);
431-
ros::serialization::OStream stamp_stream(stamp_buffer.data(), stamp_length);
432-
ros::serialization::serialize(stamp_stream, stamp);
433-
434-
// inject timestamp into payload
435-
payload_length += stamp_length;
436-
msg_offset += stamp_length;
437-
payload_buffer.resize(payload_length);
438-
payload_buffer[0] = 1;
439-
payload_buffer.insert(payload_buffer.begin() + 1,
440-
std::make_move_iterator(stamp_buffer.begin()),
441-
std::make_move_iterator(stamp_buffer.end()));
442-
443-
} else {
444441

445-
payload_buffer.resize(payload_length);
446-
payload_buffer[0] = 0;
442+
// serialize timestamp to payload [1, S, R]
443+
ros::serialization::OStream stamp_stream(&payload_buffer[1], stamp_length);
444+
ros::serialization::serialize(stamp_stream, stamp);
447445
}
448-
// add ROS message to payload
449-
payload_buffer.insert(payload_buffer.begin() + msg_offset,
450-
std::make_move_iterator(msg_buffer.begin()),
451-
std::make_move_iterator(msg_buffer.end()));
452446

453447
// send ROS message to MQTT broker
454448
mqtt_topic = ros2mqtt.mqtt.topic;

0 commit comments

Comments
 (0)