From 5d1d5440048fae925c4c711761e2ebbafdac9e52 Mon Sep 17 00:00:00 2001
From: ahmad-ra
Date: Sat, 17 Feb 2024 14:41:22 +0200
Subject: [PATCH 1/5] JSON support for full integration with non-ROS devices
for arbitrary ROS message types
---
README.md | 88 ++++++++++++---
mqtt_client/CMakeLists.txt | 1 +
mqtt_client/config/params.yaml | 2 +
mqtt_client/include/mqtt_client/MqttClient.h | 23 +++-
mqtt_client/src/MqttClient.cpp | 110 ++++++++++++++++---
5 files changed, 192 insertions(+), 32 deletions(-)
diff --git a/README.md b/README.md
index c00aa13..dde69c1 100644
--- a/README.md
+++ b/README.md
@@ -10,7 +10,7 @@
-The *mqtt_client* package provides a ROS nodelet or ROS 2 component node that enables connected ROS-based devices or robots to exchange ROS messages via an MQTT broker using the [MQTT](http://mqtt.org) protocol. This works generically for arbitrary ROS message types. The *mqtt_client* can also exchange primitive messages with MQTT clients running on devices not based on ROS.
+The *mqtt_client* package provides a ROS nodelet or ROS 2 component node that enables connected ROS-based and non-ROS-based devices or robots to exchange ROS messages via an MQTT broker using the [MQTT](http://mqtt.org) protocol. This works generically for arbitrary ROS message types. The *mqtt_client* can also exchange primitive messages with MQTT clients running on devices not based on ROS.
> [!IMPORTANT]
> This repository is open-sourced and maintained by the [**Institute for Automotive Engineering (ika) at RWTH Aachen University**](https://www.ika.rwth-aachen.de/).
@@ -26,6 +26,7 @@ The *mqtt_client* package provides a ROS nodelet or ROS 2 component node that en
- [Quick Start](#quick-start)
- [Launch](#launch)
- [Configuration](#configuration)
+- [JSON Messages](#json-messages)
- [Primitive Messages](#primitive-messages)
- [Latency Computation](#latency-computation)
- [Package Summary](#package-summary)
@@ -97,7 +98,8 @@ The *mqtt_client* is best configured with a ROS parameter *yaml* file. The confi
- MQTT messages received from the broker on MQTT topic `pingpong/ros` are published locally on ROS topic `/pong/ros`;
- primitive ROS messages received locally on ROS topic `/ping/primitive` are sent as primitive (string) messages to the broker on MQTT topic `pingpong/primitive`;
- MQTT messages received from the broker on MQTT topic `pingpong/primitive` are published locally as primitive ROS messages on ROS topic `/pong/primitive`.
-
+- ROS messages received locally on ROS topic `/ping/json` are sent to the broker in JSON format on MQTT topic `pingpong/json`;
+- MQTT messages received from the broker on MQTT topic `pingpong/json` are recieved as JSON and published locally as a ROS message on ROS topic `/pong/json`;
```yaml
broker:
host: localhost
@@ -109,12 +111,18 @@ bridge:
- ros_topic: /ping/primitive
mqtt_topic: pingpong/primitive
primitive: true
+ - ros_topic: /ping/json
+ mqtt_topic: pingpong/json
+ json: true
mqtt2ros:
- mqtt_topic: pingpong/ros
ros_topic: /pong/ros
- mqtt_topic: pingpong/primitive
ros_topic: /pong/primitive
primitive: true
+ - mqtt_topic: pingpong/json
+ ros_topic: /pong/json
+ json: true
```
#### Demo Client Launch
@@ -144,7 +152,7 @@ ros2 launch mqtt_client standalone.launch.ros2.xml
[ INFO] [1665575657.462622065]: Connected to broker at 'tcp://localhost:1883'
```
-Note that the *mqtt_client* successfully connected to the broker and also echoed which ROS/MQTT topics are being bridged. For testing the communication between *mqtt_client*, itself, and other MQTT clients, open five new terminals.
+Note that the *mqtt_client* successfully connected to the broker and also echoed which ROS/MQTT topics are being bridged. For testing the communication between *mqtt_client*, itself, and other MQTT clients, open eight new terminals.
In order to test the communication among *mqtt_clients*, publish any ROS message on ROS topic `/ping/ros` and wait for a response on ROS topic `/pong/ros`.
@@ -168,7 +176,46 @@ rostopic echo /pong/ros
ros2 topic echo /pong/ros
```
-In order to test the communication between *mqtt_client* and other MQTT clients, publish a primitive ROS message on ROS topic `/ping/primitive`, directly publish a primitive MQTT message on MQTT topic `pingpong/primitive` and wait for responses on ROS topic `/pong/primitive`. Note that you need to restart the ROS 2 *mqtt_client* with a different config file.
+In order to test the communication between *mqtt_client* and other MQTT clients, publish a ROS message on ROS topic `/ping/json`, directly publish a json MQTT message on MQTT topic `pingpong/json` and wait for responses on ROS topic `/pong/json`.
+
+```bash
+# 3rd terminal: publish ROS message to /ping/json
+
+# ROS
+rostopic pub -r 1 /ping/json geometry_msgs/PoseStamped "header:
+ seq: 1
+ stamp:
+ secs: 0
+ nsecs: 0
+ frame_id: 'map'
+pose:
+ position:
+ x: 1.0
+ y: 2.0
+ z: 3.0
+ orientation:
+ x: 0.0
+ y: 0.0
+ z: 0.0
+ w: 1.0"
+
+```
+
+```bash
+# 4th terminal: listen for primitive ROS messages on /pong/json
+
+# ROS
+rostopic echo /pong/json
+
+```
+
+```bash
+# 5th terminal: publish primitive MQTT message to pingpong/json directly using mosquitto_pub
+docker run --rm --network host eclipse-mosquitto mosquitto_pub -h localhost -t "pingpong/json" --repeat 20 --repeat-delay 1 -m "{\"header\":{\"seq\":1,\"stamp\":{\"secs\":0,\"nsecs\":0},\"frame_id\":\"map\"},\"pose\":{\"position\":{\"x\":1.0,\"y\":2.0,\"z\":0.0},\"orientation\":{\"x\":0.0,\"y\":0.0,\"z\":0.0,\"w\":1.0}}}"
+
+```
+
+In order to test the communication between *mqtt_client* and other MQTT clients for primitive messages, publish a primitive ROS message on ROS topic `/ping/primitive`, directly publish a primitive MQTT message on MQTT topic `pingpong/primitive` and wait for responses on ROS topic `/pong/primitive`. Note that you need to restart the ROS 2 *mqtt_client* with a different config file.
```bash
# ROS 2
@@ -177,7 +224,7 @@ ros2 launch mqtt_client standalone.launch.ros2.xml params_file:=$(ros2 pkg prefi
```
```bash
-# 3rd terminal: publish primitive ROS message to /ping/primitive
+# 6th terminal: publish primitive ROS message to /ping/primitive
# ROS
rostopic pub -r 1 /ping/primitive std_msgs/Int32 42
@@ -187,7 +234,7 @@ ros2 topic pub /ping/primitive std_msgs/msg/Int32 "{data: 42}"
```
```bash
-# 4th terminal: listen for primitive ROS messages on /pong/primitive
+# 7th terminal: listen for primitive ROS messages on /pong/primitive
# ROS
rostopic echo /pong/primitive
@@ -197,7 +244,7 @@ ros2 topic echo /pong/primitive
```
```bash
-# 5th terminal: publish primitive MQTT message to pingpong/primitive directly using mosquitto_pub
+# 8th terminal: publish primitive MQTT message to pingpong/primitive directly using mosquitto_pub
docker run --rm --network host eclipse-mosquitto mosquitto_pub -h localhost -t "pingpong/primitive" --repeat 20 --repeat-delay 1 -m 69
```
@@ -279,6 +326,7 @@ bridge:
- ros_topic: # ROS topic whose messages are transformed to MQTT messages
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
primitive: # [false] whether to publish as primitive message
+ json: # [false] whether message exchanging in in JSON format
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
advanced:
ros:
@@ -290,6 +338,7 @@ bridge:
- mqtt_topic: # MQTT topic on which messages are received from the broker
ros_topic: # ROS topic on which corresponding MQTT messages are published
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
+ json: # [false] whether message exchanging in in JSON format
advanced:
mqtt:
qos: # [0] MQTT QoS value
@@ -329,15 +378,26 @@ bridge:
latched: # [false] whether to latch ROS message
```
+
+## JSON Messages
+
+As seen in the [Quick Start](#quick-start), the *mqtt_client* can not only exchange arbitrary ROS messages with other *mqtt_clients*, but it can also exchange arbitrary messages with other non-*mqtt_client* MQTT clients using JSON. This allows ROS-based devices to exchange ROS messages with devices not based on ROS, and non-*mqtt_client* MQTT clients can send a JSON string with a format that matches the needed ROS message to be casted into. The `json` parameter can be set for both ROS-to-MQTT (`bridge/ros2mqtt`) and for MQTT-to-ROS (`bridge/mqtt2ros`) transmissions.
+
+If a ROS-to-MQTT transmission is configured as `json`, the message is published as a serialized JSON string.
+
+If an MQTT-to-ROS transmission is configured as `json`, the MQTT message is interpreted as a JSON string and transformed into a ROS message to be published.
+
+The `json` property of both Ros-to-MQTT and MQTT-to-ROS should be equal. Currently this is only supported for ROS, support of ROS2 will come soon.
+
+
## Primitive Messages
-As seen in the [Quick Start](#quick-start), the *mqtt_client* can not only exchange arbitrary ROS messages with other *mqtt_clients*, but it can also exchange primitive message data with other non-*mqtt_client* MQTT clients. This allows ROS-based devices to exchange primitive messages with devices not based on ROS. The `primitive` parameter can be set for both ROS-to-MQTT (`bridge/ros2mqtt`) and for MQTT-to-ROS (`bridge/mqtt2ros`) transmissions.
+Additionally, the *mqtt_client* can also exchange primitive message data with other non-*mqtt_client* MQTT clients. This allows ROS-based devices to exchange primitive messages with devices not based on ROS. The `primitive` parameter can be set for both ROS-to-MQTT (`bridge/ros2mqtt`) and for MQTT-to-ROS (`bridge/mqtt2ros`) transmissions.
If a ROS-to-MQTT transmission is configured as `primitive` and the ROS message type is one of the supported primitive ROS message types, the raw data is published as a string. The supported primitive ROS message types are [`std_msgs/String`](http://docs.ros.org/en/api/std_msgs/html/msg/String.html), [`std_msgs/Bool`](http://docs.ros.org/en/api/std_msgs/html/msg/Bool.html), [`std_msgs/Char`](http://docs.ros.org/en/api/std_msgs/html/msg/Char.html), [`std_msgs/UInt8`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt8.html), [`std_msgs/UInt16`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt16.html), [`std_msgs/UInt32`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt32.html), [`std_msgs/UInt64`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt16.html), [`std_msgs/Int8`](http://docs.ros.org/en/api/std_msgs/html/msg/Int8.html), [`std_msgs/Int16`](http://docs.ros.org/en/api/std_msgs/html/msg/Int16.html), [`std_msgs/Int32`](http://docs.ros.org/en/api/std_msgs/html/msg/Int32.html), [`std_msgs/Int64`](http://docs.ros.org/en/api/std_msgs/html/msg/Int64.html), [`std_msgs/Float32`](http://docs.ros.org/en/api/std_msgs/html/msg/Float32.html), [`std_msgs/Float32`](http://docs.ros.org/en/api/std_msgs/html/msg/Float64.html).
If an MQTT-to-ROS transmission is configured as `primitive`, the MQTT message is interpreted and published as a primitive data type, if possible. The message is probed in the following order: `bool` ([`std_msgs/Bool`](http://docs.ros.org/en/api/std_msgs/html/msg/Bool.html)), `int` ([`std_msgs/Int32`](http://docs.ros.org/en/api/std_msgs/html/msg/Int32.html)), `float` ([`std_msgs/Float32`](http://docs.ros.org/en/api/std_msgs/html/msg/Float32.html)), `string` ([`std_msgs/String`](http://docs.ros.org/en/api/std_msgs/html/msg/String.html)).
-
## Latency Computation
The *mqtt_client* provides built-in functionality to measure the latency of transferring a ROS message via an MQTT broker back to ROS. Note that this functionality is only available for non-primitive messages (see [Primitive Messages](#primitive-messages)). To this end, the sending client injects the current timestamp into the MQTT message. The receiving client can then compute the latency between message reception time and the injected timestamp. **Naturally, this is only accurate to the level of synchronization between clocks on sending and receiving machine.**
@@ -440,21 +500,23 @@ See [Configuration](#configuration).
### ROS
-The *mqtt_client* is able to bridge ROS messages of arbitrary message type to an MQTT broker. To this end, it needs to employ generic ROS subscribers and publishers, which only take shape at runtime.
+The *mqtt_client* is able to bridge ROS messages of arbitrary message type to an MQTT broker. In addition, it is able to exchange arbitrary ROS messages with non-ROS-based devices, using JSON interpretations of ROS messages. To this end, it needs to employ generic ROS subscribers, publishers, and JSON parsers, which only take shape at runtime.
-These generic ROS subscribers and publishers are realized through [topic_tools::ShapeShifter](http://docs.ros.org/diamondback/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html). For each pair of `ros_topic` and `mqtt_topic` specified under `bridge/ros2mqtt/`, a ROS subscriber is setup with the following callback signature:
+The generic JSON parser is realized through the updated version of [RosMsgParser::Parser](https://github.com/ahmad-ra/ros_msg_parser/tree/master), and the generic ROS subscribers and publishers are realized through [topic_tools::ShapeShifter](http://docs.ros.org/diamondback/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html). For each pair of `ros_topic` and `mqtt_topic` specified under `bridge/ros2mqtt/`, a ROS subscriber is setup with the following callback signature:
```cpp
void ros2mqtt(topic_tools::ShapeShifter::ConstPtr&, std::string&)
```
-Inside the callback, the generic messages received on the `ros_topic` are serialized using [ros::serialization](http://wiki.ros.org/roscpp/Overview/MessagesSerializationAndAdaptingTypes). The serialized form is then ready to be sent to the MQTT broker on the specified `mqtt_topic`.
+Inside the callback, the generic messages received on the `ros_topic` are serialized using [ros::serialization](http://wiki.ros.org/roscpp/Overview/MessagesSerializationAndAdaptingTypes). The serialized form is then either sent to the MQTT broker on the specified `mqtt_topic`, or transformed into a serialized JSON representation of the ROS message using [RosMsgParser::Parser::deserializeIntoJson](https://github.com/ahmad-ra/ros_msg_parser/blob/master/include/ros_msg_parser/ros_parser.hpp#L159C10-L159C29) before sending through `mqtt_topic`.
Upon retrieval of an MQTT message, it is republished as a ROS message on the ROS network. To this end, [topic_tools::ShapeShifter::morph](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a2b74b522fb49dac05d48f466b6b87d2d) is used to have the ShapeShifter publisher take the shape of the specific ROS message type.
+If the recieved MQTT message is in JSON format, it is converted to a serialized ROS message using [RosMsgParser::Parser::serializeFromJson](https://github.com/ahmad-ra/ros_msg_parser/blob/master/include/ros_msg_parser/ros_parser.hpp#L164C10-L164C27)
+
The required metainformation on the ROS message type can however only be extracted in the ROS subscriber callback of the publishing *mqtt_client* with calls to [topic_tools::ShapeShifter::getMD5Sum](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#af05fbf42757658e4c6a0f99ff72f7daa), [topic_tools::ShapeShifter::getDataType](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a9d57b2285213fda5e878ce7ebc42f0fb), and [topic_tools::ShapeShifter::getMessageDefinition](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a503af7234eeba0ccefca64c4d0cc1df0). These attributes are wrapped in a ROS message of custom type [mqtt_client::RosMsgType](mqtt_client_interfaces/msg/RosMsgType.msg), serialized using [ros::serialization](http://wiki.ros.org/roscpp/Overview/MessagesSerializationAndAdaptingTypes) and also shared via the MQTT broker on a special topic.
-When an *mqtt_client* receives such ROS message type metainformation, it configures the corresponding ROS ShapeShifter publisher using [topic_tools::ShapeShifter::morph](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a2b74b522fb49dac05d48f466b6b87d2d).
+When an *mqtt_client* receives such ROS message type metainformation, it configures the corresponding ROS ShapeShifter publisher using [topic_tools::ShapeShifter::morph](http://docs.ros.org/indigo/api/topic_tools/html/classtopic__tools_1_1ShapeShifter.html#a2b74b522fb49dac05d48f466b6b87d2d), as well as instantiates a [RosMsgParser::Parser](https://github.com/ahmad-ra/ros_msg_parser/tree/master) object that matched the metainformation.
The *mqtt_client* also provides functionality to measure the latency of transferring a ROS message via an MQTT broker back to ROS. To this end, the sending client injects the current timestamp into the MQTT message. The receiving client can then compute the latency between message reception time and the injected timestamp. The information about whether a timestamp is injected is also included in the custom [mqtt_client::RosMsgType](mqtt_client_interfaces/msg/RosMsgType.msg) message that is sent before. The actual `std::vector` message payload takes on one of the following forms:
diff --git a/mqtt_client/CMakeLists.txt b/mqtt_client/CMakeLists.txt
index a413faa..3550cb2 100644
--- a/mqtt_client/CMakeLists.txt
+++ b/mqtt_client/CMakeLists.txt
@@ -95,6 +95,7 @@ elseif(${ROS_VERSION} EQUAL 1)
rosfmt ## For fmt::format.
std_msgs
topic_tools
+ ros_msg_parser
)
## System dependencies are found with CMake's conventions
diff --git a/mqtt_client/config/params.yaml b/mqtt_client/config/params.yaml
index bf0a2d2..617f94d 100644
--- a/mqtt_client/config/params.yaml
+++ b/mqtt_client/config/params.yaml
@@ -5,12 +5,14 @@ bridge:
ros2mqtt:
- ros_topic: /ping/ros
mqtt_topic: pingpong/ros
+ json: true
- ros_topic: /ping/primitive
mqtt_topic: pingpong/primitive
primitive: true
mqtt2ros:
- mqtt_topic: pingpong/ros
ros_topic: /pong/ros
+ json: true
- mqtt_topic: pingpong/primitive
ros_topic: /pong/primitive
primitive: true
\ No newline at end of file
diff --git a/mqtt_client/include/mqtt_client/MqttClient.h b/mqtt_client/include/mqtt_client/MqttClient.h
index 2472132..cabb40a 100644
--- a/mqtt_client/include/mqtt_client/MqttClient.h
+++ b/mqtt_client/include/mqtt_client/MqttClient.h
@@ -37,9 +37,11 @@ SOFTWARE.
#include
#include
#include // fmt::format, fmt::join
+#include
#include
-
-
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
/**
* @brief Namespace for the mqtt_client package
*/
@@ -385,7 +387,12 @@ class MqttClient : public nodelet::Nodelet,
bool retained = false; ///< whether to retain MQTT message
} mqtt; ///< MQTT-related variables
bool primitive = false; ///< whether to publish as primitive message
- bool stamped = false; ///< whether to inject timestamp in MQTT message
+ bool json = false; ///< whether the serial messages flowing through MQTT
+ ///< broker are JSON format
+ std::shared_ptr< RosMsgParser::Parser> json_parser; ///< parser from json to ROS message and vice-versa
+
+
+ bool stamped = false; ///< whether to inject timestamp in MQTT message
};
/**
@@ -405,7 +412,11 @@ class MqttClient : public nodelet::Nodelet,
} ros; ///< ROS-related variables
bool primitive = false; ///< whether to publish as primitive message (if
///< coming from non-ROS MQTT client)
- bool stamped = false; ///< whether timestamp is injected
+ bool json = false; ///< whether the serial messages flowing through MQTT
+ ///< broker are JSON format
+ std::shared_ptr< RosMsgParser::Parser> json_parser; ///< parser from json to ROS message and vice-versa
+
+ bool stamped = false; ///< whether timestamp is injected
};
protected:
@@ -517,11 +528,11 @@ bool MqttClient::loadParameter(const std::string& key, std::vector& value)
template
bool MqttClient::loadParameter(const std::string& key, std::vector& value,
const std::vector& default_value)
-{
+ {
const bool found = private_node_handle_.param(key, value, default_value);
if (!found)
NODELET_WARN("Parameter '%s' not set, defaulting to '%s'", key.c_str(),
- fmt::format("{}", fmt::join(value, ", ")).c_str());
+ fmt::format("{}", fmt::join(value, ", ")).c_str());
if (found)
NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
fmt::format("{}", fmt::join(value, ", ")).c_str());
diff --git a/mqtt_client/src/MqttClient.cpp b/mqtt_client/src/MqttClient.cpp
index 44f9e89..d22b27f 100644
--- a/mqtt_client/src/MqttClient.cpp
+++ b/mqtt_client/src/MqttClient.cpp
@@ -235,6 +235,10 @@ void MqttClient::loadParameters() {
if (ros2mqtt_params[k].hasMember("primitive"))
ros2mqtt.primitive = ros2mqtt_params[k]["primitive"];
+ // ros2mqtt[k]/json
+ if (ros2mqtt_params[k].hasMember("json"))
+ ros2mqtt.json = ros2mqtt_params[k]["json"];
+
// ros2mqtt[k]/inject_timestamp
if (ros2mqtt_params[k].hasMember("inject_timestamp"))
ros2mqtt.stamped = ros2mqtt_params[k]["inject_timestamp"];
@@ -295,6 +299,10 @@ void MqttClient::loadParameters() {
if (mqtt2ros_params[k].hasMember("primitive"))
mqtt2ros.primitive = mqtt2ros_params[k]["primitive"];
+ // mqtt2ros[k]/json
+ if (mqtt2ros_params[k].hasMember("json"))
+ mqtt2ros.json = mqtt2ros_params[k]["json"];
+
// mqtt2ros[k]/advanced
if (mqtt2ros_params[k].hasMember("advanced")) {
XmlRpc::XmlRpcValue& advanced_params =
@@ -504,7 +512,65 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
NODELET_DEBUG("Received ROS message of type '%s' on topic '%s'",
ros_msg_type.name.c_str(), ros_topic.c_str());
- if (ros2mqtt.primitive) { // publish as primitive (string) message
+ if (ros2mqtt.json) {
+
+
+ // if ROS message type has changed
+ if (ros_msg_type.md5 !=
+ mqtt2ros_[ros2mqtt.mqtt.topic].ros.shape_shifter.getMD5Sum()) {
+
+ Mqtt2RosInterface& mqtt2ros = mqtt2ros_[ros2mqtt.mqtt.topic];
+
+ NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
+ mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
+
+ // configure ShapeShifter
+ mqtt2ros.ros.shape_shifter.morph(ros_msg_type.md5, ros_msg_type.name,
+ ros_msg_type.definition, "");
+
+ // advertise with ROS publisher
+ mqtt2ros.ros.publisher.shutdown();
+ mqtt2ros.ros.publisher = mqtt2ros.ros.shape_shifter.advertise(
+ node_handle_, mqtt2ros.ros.topic, mqtt2ros.ros.queue_size,
+ mqtt2ros.ros.latched);
+
+ // subscribe to MQTT topic with actual ROS messages
+ client_->subscribe(ros2mqtt.mqtt.topic, mqtt2ros.mqtt.qos);
+
+
+ // configure json parser
+ ros2mqtt.json_parser = std::make_shared(ros_topic, ros_msg_type.name, ros_msg_type.definition);
+
+ mqtt2ros_[ros2mqtt.mqtt.topic].json_parser = ros2mqtt.json_parser;
+ }
+
+ // deserialize the shapeshifter message into json
+ std::vector tmp_buffer;
+ std::string payload;
+ tmp_buffer.resize(ros_msg->size());
+ ros::serialization::OStream stream(tmp_buffer.data(), tmp_buffer.size());
+ ros_msg->write(stream);
+ ros2mqtt.json_parser->deserializeIntoJson(
+ RosMsgParser::Span(tmp_buffer), &payload, false, false, false);
+
+ // parse the created json string from format {topic:{t} , msg: {m}} into {m}
+ size_t startIdx = payload.find("msg");
+ if (startIdx != std::string::npos) {
+ startIdx += 6; // Move past "(msg:)"
+ size_t endIdx = payload.rfind("}");
+ if (endIdx != std::string::npos)
+ payload = "{" + payload.substr(startIdx, endIdx - startIdx);
+ else
+ NODELET_ERROR("not the expected json format");
+ } else
+ NODELET_ERROR("not the expected json format");
+
+ // serialize the json string into payload_buffer
+ payload_buffer = std::vector(payload.begin(), payload.end());
+ }
+
+
+ else if (ros2mqtt.primitive) { // publish as primitive (string) message
// resolve ROS messages to primitive strings if possible
std::string payload;
@@ -606,7 +672,7 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
const ros::WallTime& arrival_stamp) {
-
+
std::string mqtt_topic = mqtt_msg->get_topic();
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
auto& payload = mqtt_msg->get_payload_ref();
@@ -652,19 +718,37 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
msg_length -= stamp_length;
msg_offset += stamp_length;
}
+ if (mqtt2ros.json) {
+ // the message is a serialized json string
+ std::vector msg_buffer;
+ std::string msgStr = mqtt_msg->get_payload_str();
+
+ // convert json into a serialized ROS message, using the json_parser object
+ mqtt2ros.json_parser->serializeFromJson(msg_buffer, &msgStr);
+ ros::serialization::OStream msg_stream(&msg_buffer[0], msg_buffer.size());
+ // publish via ShapeShifter
+ NODELET_DEBUG(
+ "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
+ mqtt2ros.ros.shape_shifter.getDataType().c_str(),
+ mqtt2ros.ros.topic.c_str());
+
+ mqtt2ros.ros.shape_shifter.read(msg_stream);
+ mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
+ }
- // create ROS message buffer on top of MQTT message payload
- char* non_const_payload = const_cast(&payload[msg_offset]);
- uint8_t* msg_buffer = reinterpret_cast(non_const_payload);
- ros::serialization::OStream msg_stream(msg_buffer, msg_length);
+ else { // create ROS message buffer on top of MQTT message payload
+ char* non_const_payload = const_cast(&payload[msg_offset]);
+ uint8_t* msg_buffer = reinterpret_cast(non_const_payload);
+ ros::serialization::OStream msg_stream(msg_buffer, msg_length);
- // publish via ShapeShifter
- NODELET_DEBUG(
- "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
- mqtt2ros.ros.shape_shifter.getDataType().c_str(),
- mqtt2ros.ros.topic.c_str());
- mqtt2ros.ros.shape_shifter.read(msg_stream);
- mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
+ // publish via ShapeShifter
+ NODELET_DEBUG(
+ "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
+ mqtt2ros.ros.shape_shifter.getDataType().c_str(),
+ mqtt2ros.ros.topic.c_str());
+ mqtt2ros.ros.shape_shifter.read(msg_stream);
+ mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
+ }
}
From 8d475eae8c0bb8a70550ee55db0fdb18fe832e8b Mon Sep 17 00:00:00 2001
From: ahmad-ra
Date: Sun, 3 Mar 2024 17:42:49 +0200
Subject: [PATCH 2/5] JSON support in ROS/ROS2 for full integration with
non-ROS devices for arbitrary ROS message types.
---
README.md | 26 ++++-
mqtt_client/CMakeLists.txt | 19 +++-
mqtt_client/include/mqtt_client/MqttClient.h | 8 +-
.../include/mqtt_client/MqttClient.ros2.hpp | 11 ++-
mqtt_client/src/MqttClient.cpp | 9 +-
mqtt_client/src/MqttClient.ros2.cpp | 97 ++++++++++++++++++-
6 files changed, 152 insertions(+), 18 deletions(-)
diff --git a/README.md b/README.md
index dde69c1..a706472 100644
--- a/README.md
+++ b/README.md
@@ -199,6 +199,24 @@ pose:
z: 0.0
w: 1.0"
+# ROS2
+ros2 topic pub /ping/json geometry_msgs/PoseStamped "header:
+ seq: 1
+ stamp:
+ secs: 0
+ nsecs: 0
+ frame_id: 'map'
+pose:
+ position:
+ x: 1.0
+ y: 2.0
+ z: 3.0
+ orientation:
+ x: 0.0
+ y: 0.0
+ z: 0.0
+ w: 1.0"
+
```
```bash
@@ -207,6 +225,9 @@ pose:
# ROS
rostopic echo /pong/json
+# ROS2
+ros2 topic echo /pong/json
+
```
```bash
@@ -357,6 +378,7 @@ bridge:
{{ ros_topic_name }}:
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
primitive: # [false] whether to publish as primitive message
+ json: # [false] whether message exchanging in in JSON format
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
advanced:
ros:
@@ -370,6 +392,7 @@ bridge:
{{ mqtt_topic_name }}:
ros_topic: # ROS topic on which corresponding MQTT messages are published
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
+ json: # [false] whether message exchanging in in JSON format
advanced:
mqtt:
qos: # [0] MQTT QoS value
@@ -387,8 +410,7 @@ If a ROS-to-MQTT transmission is configured as `json`, the message is published
If an MQTT-to-ROS transmission is configured as `json`, the MQTT message is interpreted as a JSON string and transformed into a ROS message to be published.
-The `json` property of both Ros-to-MQTT and MQTT-to-ROS should be equal. Currently this is only supported for ROS, support of ROS2 will come soon.
-
+The `json` property of both Ros-to-MQTT and MQTT-to-ROS should be equal.
## Primitive Messages
diff --git a/mqtt_client/CMakeLists.txt b/mqtt_client/CMakeLists.txt
index 3550cb2..28d6fb0 100644
--- a/mqtt_client/CMakeLists.txt
+++ b/mqtt_client/CMakeLists.txt
@@ -20,11 +20,14 @@ if(${ROS_VERSION} EQUAL 2)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
find_package(std_msgs REQUIRED)
+ find_package(fastcdr REQUIRED)
# Paho MQTT C++ apt package doesn't include CMake config
# find_package(PahoMqttCpp REQUIRED)
find_library(PahoMqttC_LIBRARY libpaho-mqtt3as.so.1 REQUIRED)
find_library(PahoMqttCpp_LIBRARY libpaho-mqttpp3.so.1 REQUIRED)
+ set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../rosx_introspection/include)
+ set(rox_introspection_LIBRARY ${ROSX_INTROSPECTION_INCLUDE_DIR}/../build/librosx_introspection.a)
add_library(${PROJECT_NAME}_lib SHARED src/MqttClient.ros2.cpp)
@@ -35,11 +38,15 @@ if(${ROS_VERSION} EQUAL 2)
target_include_directories(${PROJECT_NAME}_lib PUBLIC
$
- $)
+ $
+ ${ROSX_INTROSPECTION_INCLUDE_DIR}
+ )
target_link_libraries(${PROJECT_NAME}_lib
${PahoMqttC_LIBRARY}
${PahoMqttCpp_LIBRARY}
+ ${rox_introspection_LIBRARY}
+ fastcdr
)
ament_target_dependencies(${PROJECT_NAME}_lib
@@ -95,13 +102,14 @@ elseif(${ROS_VERSION} EQUAL 1)
rosfmt ## For fmt::format.
std_msgs
topic_tools
- ros_msg_parser
)
## System dependencies are found with CMake's conventions
find_package(PahoMqttCpp REQUIRED)
set(PahoMqttCpp_LIBRARIES PahoMqttCpp::paho-mqttpp3)
-
+ find_package(fastcdr REQUIRED)
+ set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../rosx_introspection/include)
+ set(rox_introspection_LIBRARY ${ROSX_INTROSPECTION_INCLUDE_DIR}/../build/librosx_introspection.a)
## Uncomment this if the package has a setup.py. This macro ensures
## modules and global scripts declared therein get installed
@@ -207,6 +215,7 @@ elseif(${ROS_VERSION} EQUAL 1)
include_directories(
include
${catkin_INCLUDE_DIRS}
+ ${ROSX_INTROSPECTION_INCLUDE_DIR}
)
## Declare a C++ library
@@ -238,7 +247,9 @@ elseif(${ROS_VERSION} EQUAL 1)
target_link_libraries(${PROJECT_NAME}
${catkin_LIBRARIES}
${PahoMqttCpp_LIBRARIES}
- )
+ ${rox_introspection_LIBRARY}
+ fastcdr
+ )
#############
## Install ##
diff --git a/mqtt_client/include/mqtt_client/MqttClient.h b/mqtt_client/include/mqtt_client/MqttClient.h
index cabb40a..638e4c8 100644
--- a/mqtt_client/include/mqtt_client/MqttClient.h
+++ b/mqtt_client/include/mqtt_client/MqttClient.h
@@ -37,7 +37,7 @@ SOFTWARE.
#include
#include
#include // fmt::format, fmt::join
-#include
+#include
#include
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
@@ -389,8 +389,8 @@ class MqttClient : public nodelet::Nodelet,
bool primitive = false; ///< whether to publish as primitive message
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
- std::shared_ptr< RosMsgParser::Parser> json_parser; ///< parser from json to ROS message and vice-versa
-
+ std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
+
bool stamped = false; ///< whether to inject timestamp in MQTT message
};
@@ -414,7 +414,7 @@ class MqttClient : public nodelet::Nodelet,
///< coming from non-ROS MQTT client)
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
- std::shared_ptr< RosMsgParser::Parser> json_parser; ///< parser from json to ROS message and vice-versa
+ std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether timestamp is injected
};
diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
index c9a2d38..6e65f31 100644
--- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
+++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
@@ -41,7 +41,10 @@ SOFTWARE.
#include
#include
#include
-
+#include
+#include "rapidjson/document.h"
+#include "rapidjson/stringbuffer.h"
+#include "rapidjson/writer.h"
/**
* @brief Namespace for the mqtt_client package
@@ -404,6 +407,9 @@ class MqttClient : public rclcpp::Node,
bool retained = false; ///< whether to retain MQTT message
} mqtt; ///< MQTT-related variables
bool primitive = false; ///< whether to publish as primitive message
+ bool json = false; ///< whether the serial messages flowing through MQTT
+ ///< broker are JSON format
+ std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether to inject timestamp in MQTT message
};
@@ -426,6 +432,9 @@ class MqttClient : public rclcpp::Node,
} ros; ///< ROS-related variables
bool primitive = false; ///< whether to publish as primitive message (if
///< coming from non-ROS MQTT client)
+ bool json = false; ///< whether the serial messages flowing through MQTT
+ ///< broker are JSON format
+ std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether timestamp is injected
};
diff --git a/mqtt_client/src/MqttClient.cpp b/mqtt_client/src/MqttClient.cpp
index d22b27f..4cff0f9 100644
--- a/mqtt_client/src/MqttClient.cpp
+++ b/mqtt_client/src/MqttClient.cpp
@@ -539,8 +539,8 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
// configure json parser
- ros2mqtt.json_parser = std::make_shared(ros_topic, ros_msg_type.name, ros_msg_type.definition);
-
+ ros2mqtt.json_parser = std::make_shared>();
+ ros2mqtt.json_parser->registerParser(ros2mqtt.mqtt.topic, ros_msg_type.name, ros_msg_type.definition);
mqtt2ros_[ros2mqtt.mqtt.topic].json_parser = ros2mqtt.json_parser;
}
@@ -550,8 +550,7 @@ void MqttClient::ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
tmp_buffer.resize(ros_msg->size());
ros::serialization::OStream stream(tmp_buffer.data(), tmp_buffer.size());
ros_msg->write(stream);
- ros2mqtt.json_parser->deserializeIntoJson(
- RosMsgParser::Span(tmp_buffer), &payload, false, false, false);
+ payload =*(ros2mqtt.json_parser->deserializeIntoJson(ros2mqtt.mqtt.topic,RosMsgParser::Span(tmp_buffer),false));
// parse the created json string from format {topic:{t} , msg: {m}} into {m}
size_t startIdx = payload.find("msg");
@@ -724,7 +723,7 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
std::string msgStr = mqtt_msg->get_payload_str();
// convert json into a serialized ROS message, using the json_parser object
- mqtt2ros.json_parser->serializeFromJson(msg_buffer, &msgStr);
+ msg_buffer= mqtt2ros.json_parser->serializeFromJson( mqtt_topic, &msgStr);
ros::serialization::OStream msg_stream(&msg_buffer[0], msg_buffer.size());
// publish via ShapeShifter
NODELET_DEBUG(
diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp
index 5d6247b..e8488d5 100644
--- a/mqtt_client/src/MqttClient.ros2.cpp
+++ b/mqtt_client/src/MqttClient.ros2.cpp
@@ -294,6 +294,11 @@ void MqttClient::loadParameters() {
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), primitive_param))
ros2mqtt.primitive = primitive_param.as_bool();
+ // ros2mqtt[k]/json
+ rclcpp::Parameter json_param;
+ if (get_parameter(fmt::format("bridge.ros2mqtt.{}.json", ros_topic), json_param))
+ ros2mqtt.json = json_param.as_bool();
+
// ros2mqtt[k]/inject_timestamp
rclcpp::Parameter stamped_param;
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), stamped_param))
@@ -350,6 +355,11 @@ void MqttClient::loadParameters() {
if (get_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param))
mqtt2ros.primitive = primitive_param.as_bool();
+ // ros2mqtt[k]/json
+ rclcpp::Parameter json_param;
+ if (get_parameter(fmt::format("bridge.mqtt2ros.{}.json", ros_topic), json_param))
+ mqtt2ros.json = json_param.as_bool();
+
// mqtt2ros[k]/advanced/mqtt/qos
rclcpp::Parameter qos_param;
if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param))
@@ -596,7 +606,66 @@ void MqttClient::ros2mqtt(
RCLCPP_DEBUG(get_logger(), "Received ROS message of type '%s' on topic '%s'",
ros_msg_type.name.c_str(), ros_topic.c_str());
- if (ros2mqtt.primitive) { // publish as primitive (string) message
+ if (ros2mqtt.json) {
+
+ // if ROS message type has changed or if mapping is stale
+ if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) {
+
+ mqtt2ros.ros.msg_type = ros_msg_type.name;
+ mqtt2ros.stamped = ros_msg_type.stamped;
+ RCLCPP_INFO(get_logger(),
+ "ROS publisher message type on topic '%s' set to '%s'",
+ mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
+
+ // recreate generic publisher
+ try {
+ mqtt2ros.ros.publisher = create_generic_publisher(
+ mqtt2ros.ros.topic, ros_msg_type.name, mqtt2ros.ros.queue_size);
+ } catch (rclcpp::exceptions::RCLError& e) {
+ RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s",
+ e.what());
+ return;
+ }
+ mqtt2ros.ros.is_stale = false;
+
+ // subscribe to MQTT topic with actual ROS messages
+ client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);
+ RCLCPP_DEBUG(get_logger(), "Subscribed MQTT topic '%s'",
+ mqtt_data_topic.c_str());
+
+ // configure json parser
+ ros2mqtt.json_parser = std::make_shared>();
+ ros2mqtt.json_parser->registerParser(ros2mqtt.mqtt.topic, RosMsgParser::ROSType(ros_msg_type.name), RosMsgParser::GetMessageDefinition(ros_msg_type.name));
+ mqtt2ros_[ros2mqtt.mqtt.topic].json_parser = ros2mqtt.json_parser;
+
+ }
+
+
+ // deserialize the shapeshifter message into json
+ std::vector tmp_buffer;
+ std::string payload;
+ tmp_buffer.resize(ros_msg->size());
+ ros::serialization::OStream stream(tmp_buffer.data(), tmp_buffer.size());
+ ros_msg->write(stream);
+ payload =*(ros2mqtt.json_parser->deserializeIntoJson(ros2mqtt.mqtt.topic,RosMsgParser::Span(tmp_buffer),false));
+
+ // parse the created json string from format {topic:{t} , msg: {m}} into {m}
+ size_t startIdx = payload.find("msg");
+ if (startIdx != std::string::npos) {
+ startIdx += 6; // Move past "(msg:)"
+ size_t endIdx = payload.rfind("}");
+ if (endIdx != std::string::npos)
+ payload = "{" + payload.substr(startIdx, endIdx - startIdx);
+ else
+ NODELET_ERROR("not the expected json format");
+ } else
+ NODELET_ERROR("not the expected json format");
+
+ // serialize the json string into payload_buffer
+ payload_buffer = std::vector(payload.begin(), payload.end());
+ }
+
+ else if (ros2mqtt.primitive) { // publish as primitive (string) message
// resolve ROS messages to primitive strings if possible
std::string payload;
@@ -743,6 +812,30 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
msg_offset += stamp_length_;
}
+if (mqtt2ros.json) {
+ // the message is a serialized json string
+ std::vector msg_buffer;
+ std::string msgStr = mqtt_msg->get_payload_str();
+
+ // convert json into a serialized ROS message, using the json_parser object
+ msg_buffer= mqtt2ros.json_parser->serializeFromJson( mqtt_topic, &msgStr);
+
+ // copy ROS message from MQTT message to generic message buffer
+ rclcpp::SerializedMessage serialized_msg(msg_buffer.size());
+ std::memcpy(serialized_msg.get_rcl_serialized_message().buffer,
+ &msg_buffer[0], msg_buffer.size());
+ serialized_msg.get_rcl_serialized_message().buffer_length = msg_buffer.size();
+
+ // publish generic ROS message
+ RCLCPP_DEBUG(
+ get_logger(),
+ "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
+ mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str());
+ mqtt2ros.ros.publisher->publish(serialized_msg);
+
+
+ }
+ else{
// copy ROS message from MQTT message to generic message buffer
rclcpp::SerializedMessage serialized_msg(msg_length);
std::memcpy(serialized_msg.get_rcl_serialized_message().buffer,
@@ -755,7 +848,7 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
"Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str());
mqtt2ros.ros.publisher->publish(serialized_msg);
-}
+}}
void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
From 8930cd383fd6781edd95420329f05476c2e2484e Mon Sep 17 00:00:00 2001
From: ahmad-ra
Date: Mon, 4 Mar 2024 11:41:33 +0100
Subject: [PATCH 3/5] minor fix
---
mqtt_client/CMakeLists.txt | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/mqtt_client/CMakeLists.txt b/mqtt_client/CMakeLists.txt
index 28d6fb0..fd8013c 100644
--- a/mqtt_client/CMakeLists.txt
+++ b/mqtt_client/CMakeLists.txt
@@ -26,7 +26,7 @@ if(${ROS_VERSION} EQUAL 2)
# find_package(PahoMqttCpp REQUIRED)
find_library(PahoMqttC_LIBRARY libpaho-mqtt3as.so.1 REQUIRED)
find_library(PahoMqttCpp_LIBRARY libpaho-mqttpp3.so.1 REQUIRED)
- set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../rosx_introspection/include)
+ set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../rosx_introspection/include)
set(rox_introspection_LIBRARY ${ROSX_INTROSPECTION_INCLUDE_DIR}/../build/librosx_introspection.a)
add_library(${PROJECT_NAME}_lib SHARED src/MqttClient.ros2.cpp)
From b446a1e8de2551de9cf351fc5f7b5b31ce988249 Mon Sep 17 00:00:00 2001
From: ahmad-ra
Date: Mon, 18 Mar 2024 09:06:09 +0200
Subject: [PATCH 4/5] ROS2 Support, fix
---
mqtt_client/CMakeLists.txt | 6 ++-
mqtt_client/config/params.ros2.yaml | 2 +
.../include/mqtt_client/MqttClient.ros2.hpp | 5 ++-
mqtt_client/package.xml | 2 +
mqtt_client/src/MqttClient.ros2.cpp | 45 ++++++++++---------
5 files changed, 36 insertions(+), 24 deletions(-)
diff --git a/mqtt_client/CMakeLists.txt b/mqtt_client/CMakeLists.txt
index 28d6fb0..b872df9 100644
--- a/mqtt_client/CMakeLists.txt
+++ b/mqtt_client/CMakeLists.txt
@@ -19,6 +19,8 @@ if(${ROS_VERSION} EQUAL 2)
find_package(mqtt_client_interfaces REQUIRED)
find_package(rclcpp REQUIRED)
find_package(rclcpp_components REQUIRED)
+ find_package(rosbag2_cpp REQUIRED)
+ find_package(rosidl_typesupport_cpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(fastcdr REQUIRED)
@@ -26,7 +28,7 @@ if(${ROS_VERSION} EQUAL 2)
# find_package(PahoMqttCpp REQUIRED)
find_library(PahoMqttC_LIBRARY libpaho-mqtt3as.so.1 REQUIRED)
find_library(PahoMqttCpp_LIBRARY libpaho-mqttpp3.so.1 REQUIRED)
- set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../rosx_introspection/include)
+ set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../rosx_introspection/include)
set(rox_introspection_LIBRARY ${ROSX_INTROSPECTION_INCLUDE_DIR}/../build/librosx_introspection.a)
add_library(${PROJECT_NAME}_lib SHARED src/MqttClient.ros2.cpp)
@@ -54,6 +56,8 @@ if(${ROS_VERSION} EQUAL 2)
mqtt_client_interfaces
rclcpp
rclcpp_components
+ rosbag2_cpp
+ rosidl_typesupport_cpp
std_msgs
)
diff --git a/mqtt_client/config/params.ros2.yaml b/mqtt_client/config/params.ros2.yaml
index 5c6df70..d79867b 100644
--- a/mqtt_client/config/params.ros2.yaml
+++ b/mqtt_client/config/params.ros2.yaml
@@ -9,8 +9,10 @@ mqtt_client:
- /ping/ros
/ping/ros:
mqtt_topic: pingpong/ros
+ json: true
mqtt2ros:
mqtt_topics:
- pingpong/ros
pingpong/ros:
ros_topic: /pong/ros
+ json: true
diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
index 6e65f31..e77dfac 100644
--- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
+++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp
@@ -42,6 +42,7 @@ SOFTWARE.
#include
#include
#include
+#include "rosx_introspection/ros_utils/ros2_helpers.hpp"
#include "rapidjson/document.h"
#include "rapidjson/stringbuffer.h"
#include "rapidjson/writer.h"
@@ -409,7 +410,7 @@ class MqttClient : public rclcpp::Node,
bool primitive = false; ///< whether to publish as primitive message
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
- std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
+ std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether to inject timestamp in MQTT message
};
@@ -434,7 +435,7 @@ class MqttClient : public rclcpp::Node,
///< coming from non-ROS MQTT client)
bool json = false; ///< whether the serial messages flowing through MQTT
///< broker are JSON format
- std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
+ std::shared_ptr< RosMsgParser::ParsersCollection> json_parser; ///< parser from json to ROS message and vice-versa
bool stamped = false; ///< whether timestamp is injected
};
diff --git a/mqtt_client/package.xml b/mqtt_client/package.xml
index 73888f4..296e909 100644
--- a/mqtt_client/package.xml
+++ b/mqtt_client/package.xml
@@ -29,6 +29,8 @@
rclcpp
rclcpp_components
rcpputils
+ rosbag2_cpp
+ rosidl_typesupport_cpp
catkin
diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp
index e8488d5..00bdf9f 100644
--- a/mqtt_client/src/MqttClient.ros2.cpp
+++ b/mqtt_client/src/MqttClient.ros2.cpp
@@ -200,6 +200,8 @@ void MqttClient::loadParameters() {
declare_parameter(fmt::format("bridge.ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
param_desc.description = "whether to publish as primitive message";
declare_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
+ param_desc.description = "whether the message is in json format";
+ declare_parameter(fmt::format("bridge.ros2mqtt.{}.json", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
param_desc.description = "whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)";
declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
param_desc.description = "ROS subscriber queue size";
@@ -218,6 +220,8 @@ void MqttClient::loadParameters() {
declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
param_desc.description = "whether to publish as primitive message (if coming from non-ROS MQTT client)";
declare_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
+ param_desc.description = "whether the message is in json format";
+ declare_parameter(fmt::format("bridge.mqtt2ros.{}.json", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
param_desc.description = "MQTT QoS value";
declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
param_desc.description = "ROS publisher queue size";
@@ -355,9 +359,9 @@ void MqttClient::loadParameters() {
if (get_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param))
mqtt2ros.primitive = primitive_param.as_bool();
- // ros2mqtt[k]/json
+ // mqtt2ros[k]/json
rclcpp::Parameter json_param;
- if (get_parameter(fmt::format("bridge.mqtt2ros.{}.json", ros_topic), json_param))
+ if (get_parameter(fmt::format("bridge.mqtt2ros.{}.json", mqtt_topic), json_param))
mqtt2ros.json = json_param.as_bool();
// mqtt2ros[k]/advanced/mqtt/qos
@@ -607,10 +611,11 @@ void MqttClient::ros2mqtt(
ros_msg_type.name.c_str(), ros_topic.c_str());
if (ros2mqtt.json) {
-
+ Mqtt2RosInterface& mqtt2ros = mqtt2ros_[ros2mqtt.mqtt.topic];
+
// if ROS message type has changed or if mapping is stale
- if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) {
-
+ if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale || ros2mqtt.json_parser.get() == nullptr ) {
+
mqtt2ros.ros.msg_type = ros_msg_type.name;
mqtt2ros.stamped = ros_msg_type.stamped;
RCLCPP_INFO(get_logger(),
@@ -629,24 +634,22 @@ void MqttClient::ros2mqtt(
mqtt2ros.ros.is_stale = false;
// subscribe to MQTT topic with actual ROS messages
- client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);
- RCLCPP_DEBUG(get_logger(), "Subscribed MQTT topic '%s'",
- mqtt_data_topic.c_str());
-
+ client_->subscribe(ros2mqtt.mqtt.topic, mqtt2ros.mqtt.qos);
+
// configure json parser
- ros2mqtt.json_parser = std::make_shared>();
+ ros2mqtt.json_parser = std::make_shared>();
ros2mqtt.json_parser->registerParser(ros2mqtt.mqtt.topic, RosMsgParser::ROSType(ros_msg_type.name), RosMsgParser::GetMessageDefinition(ros_msg_type.name));
mqtt2ros_[ros2mqtt.mqtt.topic].json_parser = ros2mqtt.json_parser;
}
- // deserialize the shapeshifter message into json
- std::vector tmp_buffer;
+ // deserialize the message into json
+ const uint8_t* data_ptr = serialized_msg->get_rcl_serialized_message().buffer;
+ size_t data_size = serialized_msg->get_rcl_serialized_message().buffer_length;
+ std::vector tmp_buffer(data_size);
+ std::copy(data_ptr, data_ptr + data_size, tmp_buffer.begin());
std::string payload;
- tmp_buffer.resize(ros_msg->size());
- ros::serialization::OStream stream(tmp_buffer.data(), tmp_buffer.size());
- ros_msg->write(stream);
payload =*(ros2mqtt.json_parser->deserializeIntoJson(ros2mqtt.mqtt.topic,RosMsgParser::Span(tmp_buffer),false));
// parse the created json string from format {topic:{t} , msg: {m}} into {m}
@@ -657,9 +660,9 @@ void MqttClient::ros2mqtt(
if (endIdx != std::string::npos)
payload = "{" + payload.substr(startIdx, endIdx - startIdx);
else
- NODELET_ERROR("not the expected json format");
+ RCLCPP_ERROR(get_logger(),"not the expected json format");
} else
- NODELET_ERROR("not the expected json format");
+ RCLCPP_ERROR(get_logger(),"not the expected json format");
// serialize the json string into payload_buffer
payload_buffer = std::vector(payload.begin(), payload.end());
@@ -811,7 +814,7 @@ void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
msg_length -= stamp_length_;
msg_offset += stamp_length_;
}
-
+
if (mqtt2ros.json) {
// the message is a serialized json string
std::vector msg_buffer;
@@ -819,11 +822,11 @@ if (mqtt2ros.json) {
// convert json into a serialized ROS message, using the json_parser object
msg_buffer= mqtt2ros.json_parser->serializeFromJson( mqtt_topic, &msgStr);
-
+
// copy ROS message from MQTT message to generic message buffer
rclcpp::SerializedMessage serialized_msg(msg_buffer.size());
std::memcpy(serialized_msg.get_rcl_serialized_message().buffer,
- &msg_buffer[0], msg_buffer.size());
+ &(msg_buffer[0]), msg_buffer.size());
serialized_msg.get_rcl_serialized_message().buffer_length = msg_buffer.size();
// publish generic ROS message
@@ -1138,7 +1141,7 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
// subscribe to MQTT topic with actual ROS messages
client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);
RCLCPP_DEBUG(get_logger(), "Subscribed MQTT topic '%s'",
- mqtt_data_topic.c_str());
+ mqtt_data_topic.c_str());
}
} else {
From e5cc36ef5004947a188f6cfd86dfd91c6309872d Mon Sep 17 00:00:00 2001
From: ahmad-ra
Date: Sat, 13 Apr 2024 13:30:11 +0200
Subject: [PATCH 5/5] _Change the dependency (rosx_introspection) to be a
catkin/colcon package, rather than a static lib built with cmake
---
README.md | 19 +++----------------
mqtt_client/CMakeLists.txt | 14 ++++++--------
mqtt_client/config/params.yaml | 4 ++++
mqtt_client/package.xml | 1 +
4 files changed, 14 insertions(+), 24 deletions(-)
diff --git a/README.md b/README.md
index a706472..d018008 100644
--- a/README.md
+++ b/README.md
@@ -200,22 +200,9 @@ pose:
w: 1.0"
# ROS2
-ros2 topic pub /ping/json geometry_msgs/PoseStamped "header:
- seq: 1
- stamp:
- secs: 0
- nsecs: 0
- frame_id: 'map'
-pose:
- position:
- x: 1.0
- y: 2.0
- z: 3.0
- orientation:
- x: 0.0
- y: 0.0
- z: 0.0
- w: 1.0"
+
+ros2 topic pub /ping/json geometry_msgs/PoseStamped '{header: {stamp: {sec: 1, nanosec: 50}, frame_id: "map"}, pose: {position: {x: 1.2, y: 3.4, z: 5.6}, orientation: {x: 7.8, y: 0.0, z: 0.0, w: 1.0}}}'
+
```
diff --git a/mqtt_client/CMakeLists.txt b/mqtt_client/CMakeLists.txt
index b872df9..03aedf7 100644
--- a/mqtt_client/CMakeLists.txt
+++ b/mqtt_client/CMakeLists.txt
@@ -23,13 +23,12 @@ if(${ROS_VERSION} EQUAL 2)
find_package(rosidl_typesupport_cpp REQUIRED)
find_package(std_msgs REQUIRED)
find_package(fastcdr REQUIRED)
+ find_package(rosx_introspection REQUIRED)
# Paho MQTT C++ apt package doesn't include CMake config
# find_package(PahoMqttCpp REQUIRED)
find_library(PahoMqttC_LIBRARY libpaho-mqtt3as.so.1 REQUIRED)
find_library(PahoMqttCpp_LIBRARY libpaho-mqttpp3.so.1 REQUIRED)
- set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../rosx_introspection/include)
- set(rox_introspection_LIBRARY ${ROSX_INTROSPECTION_INCLUDE_DIR}/../build/librosx_introspection.a)
add_library(${PROJECT_NAME}_lib SHARED src/MqttClient.ros2.cpp)
@@ -41,14 +40,15 @@ if(${ROS_VERSION} EQUAL 2)
target_include_directories(${PROJECT_NAME}_lib PUBLIC
$
$
- ${ROSX_INTROSPECTION_INCLUDE_DIR}
+ ${rosx_introspection_INCLUDE_DIRS}
)
target_link_libraries(${PROJECT_NAME}_lib
${PahoMqttC_LIBRARY}
${PahoMqttCpp_LIBRARY}
- ${rox_introspection_LIBRARY}
+ ${rosx_introspection_LIBRARIES}
fastcdr
+
)
ament_target_dependencies(${PROJECT_NAME}_lib
@@ -59,6 +59,7 @@ if(${ROS_VERSION} EQUAL 2)
rosbag2_cpp
rosidl_typesupport_cpp
std_msgs
+ rosx_introspection
)
install(TARGETS ${PROJECT_NAME}_lib
@@ -106,14 +107,13 @@ elseif(${ROS_VERSION} EQUAL 1)
rosfmt ## For fmt::format.
std_msgs
topic_tools
+ rosx_introspection
)
## System dependencies are found with CMake's conventions
find_package(PahoMqttCpp REQUIRED)
set(PahoMqttCpp_LIBRARIES PahoMqttCpp::paho-mqttpp3)
find_package(fastcdr REQUIRED)
- set(ROSX_INTROSPECTION_INCLUDE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/../../rosx_introspection/include)
- set(rox_introspection_LIBRARY ${ROSX_INTROSPECTION_INCLUDE_DIR}/../build/librosx_introspection.a)
## Uncomment this if the package has a setup.py. This macro ensures
## modules and global scripts declared therein get installed
@@ -219,7 +219,6 @@ elseif(${ROS_VERSION} EQUAL 1)
include_directories(
include
${catkin_INCLUDE_DIRS}
- ${ROSX_INTROSPECTION_INCLUDE_DIR}
)
## Declare a C++ library
@@ -251,7 +250,6 @@ elseif(${ROS_VERSION} EQUAL 1)
target_link_libraries(${PROJECT_NAME}
${catkin_LIBRARIES}
${PahoMqttCpp_LIBRARIES}
- ${rox_introspection_LIBRARY}
fastcdr
)
diff --git a/mqtt_client/config/params.yaml b/mqtt_client/config/params.yaml
index 617f94d..ddad0f8 100644
--- a/mqtt_client/config/params.yaml
+++ b/mqtt_client/config/params.yaml
@@ -5,6 +5,8 @@ bridge:
ros2mqtt:
- ros_topic: /ping/ros
mqtt_topic: pingpong/ros
+ - ros_topic: /ping/json
+ mqtt_topic: pingpong/json
json: true
- ros_topic: /ping/primitive
mqtt_topic: pingpong/primitive
@@ -12,6 +14,8 @@ bridge:
mqtt2ros:
- mqtt_topic: pingpong/ros
ros_topic: /pong/ros
+ - mqtt_topic: pingpong/json
+ ros_topic: /pong/json
json: true
- mqtt_topic: pingpong/primitive
ros_topic: /pong/primitive
diff --git a/mqtt_client/package.xml b/mqtt_client/package.xml
index 296e909..bfdcac9 100644
--- a/mqtt_client/package.xml
+++ b/mqtt_client/package.xml
@@ -20,6 +20,7 @@
mqtt_client_interfaces
ros_environment
std_msgs
+ rosx_introspection
ament_cmake