Skip to content

Commit 70f3370

Browse files
authored
Merge pull request #6 from ika-rwth-aachen/feature/primitive-msgs
Support exchange of primitive messages with other MQTT clients
2 parents b7aa23f + f4530a2 commit 70f3370

File tree

4 files changed

+465
-101
lines changed

4 files changed

+465
-101
lines changed

README.md

Lines changed: 64 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -5,16 +5,17 @@
55
<img src="https://img.shields.io/github/v/release/ika-rwth-aachen/mqtt_client"/></a>
66
<img src="https://img.shields.io/github/license/ika-rwth-aachen/mqtt_client"/></a>
77
<a href="https://github.com/ika-rwth-aachen/mqtt_client/actions/workflows/build.yml"><img src="https://github.com/ika-rwth-aachen/mqtt_client/actions/workflows/build.yml/badge.svg"/></a>
8-
<img src="https://img.shields.io/github/stars/ika-rwth-aachen/mqtt_client?style=social"/></a>
8+
<a href="https://github.com/ika-rwth-aachen/mqtt_client"><img src="https://img.shields.io/github/stars/ika-rwth-aachen/mqtt_client?style=social"/></a>
99
</p>
1010

11-
The *mqtt_client* package provides a ROS nodelet that enables connected ROS-based devices or robots to exchange ROS messages via an MQTT broker using the [MQTT](http://mqtt.org) protocol. This works generically for arbitrary ROS message types.
11+
The *mqtt_client* package provides a ROS nodelet that enables connected ROS-based devices or robots to exchange ROS messages via an MQTT broker using the [MQTT](http://mqtt.org) protocol. This works generically for arbitrary ROS message types. The *mqtt_client* can also exchange primitive messages with MQTT clients running on devices not based on ROS.
1212

1313
- [Installation](#installation)
1414
- [Usage](#usage)
1515
- [Quick Start](#quick-start)
1616
- [Launch](#launch)
1717
- [Configuration](#configuration)
18+
- [Primitive Messages](#primitive-messages)
1819
- [Latency Computation](#latency-computation)
1920
- [Package Summary](#package-summary)
2021
- [How It Works](#how-it-works)
@@ -57,20 +58,28 @@ docker run --rm --network host --name mosquitto eclipse-mosquitto
5758

5859
The *mqtt_client* is best configured with a ROS parameter *yaml* file. The configuration shown below (also see [`params.yaml`](launch/params.yaml)) allows an exchange of messages as follows:
5960

60-
- ROS messages received locally on topic `/ping` are sent to the broker on MQTT topic `pingpong`
61-
- MQTT messages received from the broker on MQTT topic `pingpong` are published locally on ROS topic `/pong`
61+
- ROS messages received locally on ROS topic `/ping/ros` are sent to the broker on MQTT topic `pingpong/ros`;
62+
- MQTT messages received from the broker on MQTT topic `pingpong/ros` are published locally on ROS topic `/pong/ros`;
63+
- primitive ROS messages received locally on ROS topic `/ping/primitive` are sent as primitive (string) messages to the broker on MQTT topic `pingpong/primitive`;
64+
- MQTT messages received from the broker on MQTT topic `pingpong/primitive` are published locally as primitive ROS messages on ROS topic `/pong/primitive`.
6265

6366
```yaml
6467
broker:
6568
host: localhost
6669
port: 1883
6770
bridge:
6871
ros2mqtt:
69-
- ros_topic: /ping
70-
mqtt_topic: pingpong
72+
- ros_topic: /ping/ros
73+
mqtt_topic: pingpong/ros
74+
- ros_topic: /ping/primitive
75+
mqtt_topic: pingpong/primitive
76+
primitive: true
7177
mqtt2ros:
72-
- mqtt_topic: pingpong
73-
ros_topic: /pong
78+
- mqtt_topic: pingpong/ros
79+
ros_topic: /pong/ros
80+
- mqtt_topic: pingpong/primitive
81+
ros_topic: /pong/primitive
82+
primitive: true
7483
```
7584
7685
#### Demo Client Launch
@@ -82,33 +91,52 @@ roslaunch mqtt_client standalone.launch
8291
```
8392

8493
```txt
85-
[ WARN] [1652888439.858857758]: Parameter 'broker/tls/enabled' not set, defaulting to '0'
86-
[ WARN] [1652888439.859706411]: Parameter 'client/id' not set, defaulting to ''
87-
[ WARN] [1652888439.859717540]: Client buffer can not be enabled when client ID is empty
88-
[ WARN] [1652888439.860144376]: Parameter 'client/clean_session' not set, defaulting to '1'
89-
[ WARN] [1652888439.860366209]: Parameter 'client/keep_alive_interval' not set, defaulting to '0.000000'
90-
[ WARN] [1652888439.860583442]: Parameter 'client/max_inflight' not set, defaulting to '65535'
91-
[ INFO] [1652888439.860924591]: Bridging ROS topic '/ping' to MQTT topic 'pingpong'
92-
[ INFO] [1652888439.860967600]: Bridging MQTT topic 'pingpong' to ROS topic '/pong'
93-
[ INFO] [1652888439.861800203]: Connecting to broker at 'tcp://localhost:1883' ...
94-
[ INFO] [1652888440.062255501]: Connected to broker at 'tcp://localhost:1883'
94+
[ WARN] [1665575657.358869079]: Parameter 'broker/tls/enabled' not set, defaulting to '0'
95+
[ WARN] [1665575657.359798329]: Parameter 'client/id' not set, defaulting to ''
96+
[ WARN] [1665575657.359810889]: Client buffer can not be enabled when client ID is empty
97+
[ WARN] [1665575657.360300703]: Parameter 'client/clean_session' not set, defaulting to '1'
98+
[ WARN] [1665575657.360576344]: Parameter 'client/keep_alive_interval' not set, defaulting to '60.000000'
99+
[ WARN] [1665575657.360847295]: Parameter 'client/max_inflight' not set, defaulting to '65535'
100+
[ INFO] [1665575657.361281461]: Bridging ROS topic '/ping/ros' to MQTT topic 'pingpong/ros'
101+
[ INFO] [1665575657.361303380]: Bridging primitive ROS topic '/ping/primitive' to MQTT topic 'pingpong/primitive'
102+
[ INFO] [1665575657.361352809]: Bridging MQTT topic 'pingpong/ros' to ROS topic '/pong/ros'
103+
[ INFO] [1665575657.361370558]: Bridging MQTT topic 'pingpong/primitive' to primitive ROS topic '/pong/primitive'
104+
[ INFO] [1665575657.362153083]: Connecting to broker at 'tcp://localhost:1883' ...
105+
[ INFO] [1665575657.462622065]: Connected to broker at 'tcp://localhost:1883'
95106
```
96107

97-
Note that the *mqtt_client* successfully connected to the broker and also echoed which ROS/MQTT topics are being bridged.
108+
Note that the *mqtt_client* successfully connected to the broker and also echoed which ROS/MQTT topics are being bridged. For testing the communication between *mqtt_client*, itself, and other MQTT clients, open five new terminals.
98109

99-
In order to test the communication, publish any message on ROS topic `/ping` and wait for a response on ROS topic `/pong`. To this end, open two new terminals and execute the following commands.
110+
In order to test the communication among *mqtt_clients*, publish any ROS message on ROS topic `/ping/ros` and wait for a response on ROS topic `/pong/ros`.
100111

101112
```bash
102-
# 1st terminal: listen on /pong
103-
rostopic echo /pong
113+
# 1st terminal: listen for ROS messages on /pong/ros
114+
rostopic echo /pong/ros
104115
```
105116

106117
```bash
107-
# 2nd terminal: publish to /ping
108-
rostopic pub -r 1 /ping std_msgs/String "Hello MQTT!"
118+
# 2nd terminal: publish ROS message to /ping/ros
119+
rostopic pub -r 1 /ping/ros std_msgs/String "Hello MQTT!"
109120
```
110121

111-
If everything works as expected, a new message should be printed in the first terminal once a second.
122+
In order to test the communication between *mqtt_client* and other MQTT clients, publish a primitive ROS message on ROS topic `/ping/primitive`, directly publish a primitive MQTT message on MQTT topic `pingpong/primitive` and wait for responses on ROS topic `/pong/primitive`.
123+
124+
```bash
125+
# 3rd terminal: listen for primitive ROS messages on /pong/primitive
126+
rostopic echo /pong/primitive
127+
```
128+
129+
```bash
130+
# 4th terminal: publish primitive ROS message to /ping/primitive
131+
rostopic pub -r 1 /ping/primitive std_msgs/Int32 42
132+
```
133+
134+
```bash
135+
# 5th terminal: publish primitive MQTT message to pingpong/primitive
136+
docker run --rm --network host eclipse-mosquitto mosquitto_pub -h localhost -t "pingpong/primitive" --repeat 20 --repeat-delay 1 -m 69
137+
```
138+
139+
If everything works as expected, the second terminal should print a message at 1Hz, while the third terminal should print two different messages at 1Hz.
112140

113141
### Launch
114142

@@ -178,6 +206,7 @@ bridge:
178206
ros2mqtt: # array specifying which ROS topics to map to which MQTT topics
179207
- ros_topic: # ROS topic whose messages are transformed to MQTT messages
180208
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
209+
primitive: # [false] whether to publish as primitive message
181210
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
182211
advanced:
183212
ros:
@@ -188,6 +217,7 @@ bridge:
188217
mqtt2ros: # array specifying which MQTT topics to map to which ROS topics
189218
- mqtt_topic: # MQTT topic on which messages are received from the broker
190219
ros_topic: # ROS topic on which corresponding MQTT messages are published
220+
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
191221
advanced:
192222
mqtt:
193223
qos: # [0] MQTT QoS value
@@ -196,9 +226,17 @@ bridge:
196226
latched: # [false] whether to latch ROS message
197227
```
198228
229+
## Primitive Messages
230+
231+
As seen in the [Quick Start](#quick-start), the *mqtt_client* can not only exchange arbitrary ROS messages with other *mqtt_clients*, but it can also exchange primitive message data with other non-*mqtt_client* MQTT clients. This allows ROS-based devices to exchange primitive messages with devices not based on ROS. The `primitive` parameter can be set for both ROS-to-MQTT (`bridge/ros2mqtt`) and for MQTT-to-ROS (`bridge/mqtt2ros`) transmissions.
232+
233+
If a ROS-to-MQTT transmission is configured as `primitive`, the ROS message is simply serialized to a string representation, without providing any information on the underlying ROS message type via MQTT. If the ROS message type is one of the supported primitive ROS message types, the encapsulating ROS message components are also removed, s.t. only the raw data is published as a string. The supported primitive ROS message types are [`std_msgs/String`](http://docs.ros.org/en/api/std_msgs/html/msg/String.html), [`std_msgs/Bool`](http://docs.ros.org/en/api/std_msgs/html/msg/Bool.html), [`std_msgs/Char`](http://docs.ros.org/en/api/std_msgs/html/msg/Char.html), [`std_msgs/UInt8`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt8.html), [`std_msgs/UInt16`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt16.html), [`std_msgs/UInt32`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt32.html), [`std_msgs/UInt64`](http://docs.ros.org/en/api/std_msgs/html/msg/UInt16.html), [`std_msgs/Int8`](http://docs.ros.org/en/api/std_msgs/html/msg/Int8.html), [`std_msgs/Int16`](http://docs.ros.org/en/api/std_msgs/html/msg/Int16.html), [`std_msgs/Int32`](http://docs.ros.org/en/api/std_msgs/html/msg/Int32.html), [`std_msgs/Int64`](http://docs.ros.org/en/api/std_msgs/html/msg/Int64.html), [`std_msgs/Float32`](http://docs.ros.org/en/api/std_msgs/html/msg/Float32.html), [`std_msgs/Float32`](http://docs.ros.org/en/api/std_msgs/html/msg/Float64.html).
234+
235+
If an MQTT-to-ROS transmission is configured as `primitive`, the MQTT message is interpreted and published as a primitive data type, if possible. The message is probed in the following order: `bool` ([`std_msgs/Bool`](http://docs.ros.org/en/api/std_msgs/html/msg/Bool.html)), `int` ([`std_msgs/Int32`](http://docs.ros.org/en/api/std_msgs/html/msg/Int32.html)), `float` ([`std_msgs/Float32`](http://docs.ros.org/en/api/std_msgs/html/msg/Float32.html)), `string` ([`std_msgs/String`](http://docs.ros.org/en/api/std_msgs/html/msg/String.html)).
236+
199237
## Latency Computation
200238

201-
The *mqtt_client* provides built-in functionality to measure the latency of transferring a ROS message via an MQTT broker back to ROS. To this end, the sending client injects the current timestamp into the MQTT message. The receiving client can then compute the latency between message reception time and the injected timestamp. **Naturally, this is only accurate to the level of synchronization between clocks on sending and receiving machine.**
239+
The *mqtt_client* provides built-in functionality to measure the latency of transferring a ROS message via an MQTT broker back to ROS. Note that this functionality is only available for non-primitive messages (see [Primitive Messages](#primitive-messages)). To this end, the sending client injects the current timestamp into the MQTT message. The receiving client can then compute the latency between message reception time and the injected timestamp. **Naturally, this is only accurate to the level of synchronization between clocks on sending and receiving machine.**
202240

203241
In order to inject the current timestamp into outgoing MQTT messages, the parameter `inject_timestamp` has to be set for the corresponding `bridge/ros2mqtt` entry. The receiving *mqtt_client* will then automatically publish the measured latency in seconds as a ROS `std_msgs/Float64` message on topic `/<mqtt_client_name>/latencies/<mqtt2ros/ros_topic>`.
204242

include/mqtt_client/MqttClient.h

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -192,6 +192,26 @@ class MqttClient : public nodelet::Nodelet,
192192
void mqtt2ros(mqtt::const_message_ptr mqtt_msg,
193193
const ros::WallTime& arrival_stamp = ros::WallTime::now());
194194

195+
/**
196+
* @brief Publishes a primitive message received via MQTT to ROS.
197+
*
198+
* This tries to interpret the raw MQTT message as a bool, int, or float value
199+
* in the given order before falling back to string. The message is then
200+
* published as a corresponding primitive ROS message. This utilizes the
201+
* ShapeShifter stored for the MQTT topic on which the message was received.
202+
* The ShapeShifter is dynamically configured to the appropriate ROS message
203+
* type.
204+
*
205+
* The following mappings from primitive type to ROS message type hold:
206+
* bool: std_msgs/Bool
207+
* int: std_msgs/Int32
208+
* float: std_msgs/Float32
209+
* string: std_msgs/String
210+
*
211+
* @param mqtt_msg MQTT message
212+
*/
213+
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);
214+
195215
/**
196216
* @brief Callback for when the client has successfully connected to the
197217
* broker.
@@ -328,6 +348,7 @@ class MqttClient : public nodelet::Nodelet,
328348
int qos = 0; ///< MQTT QoS value
329349
bool retained = false; ///< whether to retain MQTT message
330350
} mqtt; ///< MQTT-related variables
351+
bool primitive = false; ///< whether to publish as primitive message
331352
bool stamped = false; ///< whether to inject timestamp in MQTT message
332353
};
333354

@@ -346,6 +367,8 @@ class MqttClient : public nodelet::Nodelet,
346367
int queue_size = 1; ///< ROS publisher queue size
347368
bool latched = false; ///< whether to latch ROS message
348369
} ros; ///< ROS-related variables
370+
bool primitive = false; ///< whether to publish as primitive message (if
371+
///< coming from non-ROS MQTT client)
349372
};
350373

351374
protected:
@@ -439,4 +462,22 @@ bool MqttClient::loadParameter(const std::string& key, T& value,
439462
return found;
440463
}
441464

465+
466+
/**
467+
* Serializes a ROS message to a buffer.
468+
*
469+
* @tparam T ROS message type
470+
*
471+
* @param[in] msg ROS message
472+
* @param[out] buffer buffer to serialize to
473+
*/
474+
template <typename T>
475+
void serializeRosMessage(const T& msg, std::vector<uint8_t>& buffer) {
476+
477+
const uint32_t length = ros::serialization::serializationLength(msg);
478+
buffer.resize(length);
479+
ros::serialization::OStream stream(buffer.data(), length);
480+
ros::serialization::serialize(stream, msg);
481+
}
482+
442483
} // namespace mqtt_client

launch/params.yaml

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,14 @@ broker:
33
port: 1883
44
bridge:
55
ros2mqtt:
6-
- ros_topic: /ping
7-
mqtt_topic: pingpong
6+
- ros_topic: /ping/ros
7+
mqtt_topic: pingpong/ros
8+
- ros_topic: /ping/primitive
9+
mqtt_topic: pingpong/primitive
10+
primitive: true
811
mqtt2ros:
9-
- mqtt_topic: pingpong
10-
ros_topic: /pong
12+
- mqtt_topic: pingpong/ros
13+
ros_topic: /pong/ros
14+
- mqtt_topic: pingpong/primitive
15+
ros_topic: /pong/primitive
16+
primitive: true

0 commit comments

Comments
 (0)