Skip to content

Commit 0f5ec67

Browse files
committed
Make fixed type publishers ahead of time
- Updated the docs - Added in "system_default" qos values - Fixed type publishers are now made right away rather than waiting for an mqtt message to come in
1 parent 0bd52b2 commit 0f5ec67

File tree

3 files changed

+69
-18
lines changed

3 files changed

+69
-18
lines changed

README.md

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -275,58 +275,66 @@ client:
275275
276276
```yaml
277277
bridge:
278-
ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics
278+
ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics
279279
- ros_topic: # ROS topic whose messages are transformed to MQTT messages
280280
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
281281
primitive: # [false] whether to publish as primitive message
282282
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
283283
advanced:
284284
ros:
285-
queue_size: # [1] ROS subscriber queue size
285+
queue_size: # [1] ROS subscriber queue size
286286
mqtt:
287-
qos: # [0] MQTT QoS value
288-
retained: # [false] whether to retain MQTT message
289-
mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics
287+
qos: # [0] MQTT QoS value
288+
retained: # [false] whether to retain MQTT message
289+
mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics
290290
- mqtt_topic: # MQTT topic on which messages are received from the broker
291291
ros_topic: # ROS topic on which corresponding MQTT messages are published
292292
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
293293
advanced:
294294
mqtt:
295-
qos: # [0] MQTT QoS value
295+
qos: # [0] MQTT QoS value
296296
ros:
297-
queue_size: # [1] ROS publisher queue size
298-
latched: # [false] whether to latch ROS message
297+
queue_size: # [1] ROS publisher queue size
298+
latched: # [false] whether to latch ROS message
299299
```
300300
301301
##### ROS 2
302302
303303
```yaml
304304
bridge:
305-
ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics
306-
ros_topics: # Array specifying which ROS topics to bridge
305+
ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics
306+
ros_topics: # Array specifying which ROS topics to bridge
307307
- {{ ros_topic_name }} # The ROS topic that should be bridged, corresponds to the sub-object in the YAML
308308
{{ ros_topic_name }}:
309309
mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker
310310
primitive: # [false] whether to publish as primitive message
311+
ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher
311312
inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)
312313
advanced:
313314
ros:
314-
queue_size: # [1] ROS subscriber queue size
315+
queue_size: # [1] ROS subscriber queue size
316+
qos:
317+
reliability: # [auto] One of "auto", "system_default", "reliable", "best_effort". If auto, the QoS is automatically determined via the publisher
318+
durability: # [auto] One of "auto", "system_default", "volatile", "transient_local". If auto, the QoS is automatically determined via the publisher
315319
mqtt:
316-
qos: # [0] MQTT QoS value
317-
retained: # [false] whether to retain MQTT message
320+
qos: # [0] MQTT QoS value
321+
retained: # [false] whether to retain MQTT message
318322
mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics
319323
mqtt_topics: # Array specifying which ROS topics to bridge
320324
- {{ mqtt_topic_name }} # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML
321325
{{ mqtt_topic_name }}:
322326
ros_topic: # ROS topic on which corresponding MQTT messages are published
327+
ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message
323328
primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client)
324329
advanced:
325330
mqtt:
326331
qos: # [0] MQTT QoS value
327332
ros:
328333
queue_size: # [1] ROS publisher queue size
329334
latched: # [false] whether to latch ROS message
335+
qos:
336+
reliability: # [system_default] One of "system_default", "reliable", "best_effort".
337+
durability: # [system_default] One of "system_default", "volatile", "transient_local".
330338
```
331339
332340
## Primitive Messages

mqtt_client/include/mqtt_client/MqttClient.ros2.hpp

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,10 +209,17 @@ class MqttClient : public rclcpp::Node,
209209
const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const;
210210

211211
/**
212-
* @brief Checks all active ROS topics in order to set up generic subscribers.
212+
* @brief Setup any subscriptions we can.
213+
*
214+
* These may be fixed type/QoS, or dynamically matched against active publisher
213215
*/
214216
void setupSubscriptions();
215217

218+
/**
219+
* @brief Setup any publishers that we can
220+
*/
221+
void setupPublishers();
222+
216223
/**
217224
* @brief Sets up the client connection options and initializes the client
218225
* object.

mqtt_client/src/MqttClient.ros2.cpp

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -448,7 +448,9 @@ void MqttClient::loadParameters() {
448448
rclcpp::Parameter durability_param;
449449
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) {
450450
const auto p = durability_param.as_string();
451-
if (p == "volatile") {
451+
if (p == "system_default") {
452+
ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault;
453+
} else if (p == "volatile") {
452454
ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile;
453455
} else if (p == "transient_local") {
454456
ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::TransientLocal;
@@ -463,7 +465,9 @@ void MqttClient::loadParameters() {
463465
rclcpp::Parameter reliability_param;
464466
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) {
465467
const auto p = reliability_param.as_string();
466-
if (p == "best_effort") {
468+
if (p == "system_default") {
469+
ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault;
470+
} else if (p == "best_effort") {
467471
ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort;
468472
} else if (p == "reliable") {
469473
ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable;
@@ -538,7 +542,9 @@ void MqttClient::loadParameters() {
538542
rclcpp::Parameter durability_param;
539543
if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) {
540544
const auto p = durability_param.as_string();
541-
if (p == "volatile") {
545+
if (p == "system_default") {
546+
mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault;
547+
} else if (p == "volatile") {
542548
mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile;
543549
} else if (p == "transient_local") {
544550
mqtt2ros.ros.qos.durability =
@@ -552,7 +558,9 @@ void MqttClient::loadParameters() {
552558
rclcpp::Parameter reliability_param;
553559
if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) {
554560
const auto p = reliability_param.as_string();
555-
if (p == "best_effort") {
561+
if (p == "system_default") {
562+
mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault;
563+
} else if (p == "best_effort") {
556564
mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort;
557565
} else if (p == "reliable") {
558566
mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable;
@@ -664,6 +672,8 @@ void MqttClient::setup() {
664672
check_subscriptions_timer_ =
665673
create_wall_timer(std::chrono::duration<double>(1.0),
666674
std::bind(&MqttClient::setupSubscriptions, this));
675+
676+
setupPublishers ();
667677
}
668678

669679
std::optional<rclcpp::QoS> MqttClient::getCompatibleQoS (const std::string &ros_topic, const rclcpp::TopicEndpointInfo &tei,
@@ -812,6 +822,31 @@ void MqttClient::setupSubscriptions() {
812822
}
813823
}
814824

825+
void MqttClient::setupPublishers() {
826+
827+
for (auto& [mqtt_topic, mqtt2ros] : mqtt2ros_) {
828+
if (mqtt2ros.ros.publisher)
829+
continue;
830+
831+
// If the type is not fixed, we require a mqtt message in order to deduce the type
832+
if (!mqtt2ros.fixed_type)
833+
continue;
834+
835+
try {
836+
const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size)
837+
.durability(mqtt2ros.ros.qos.durability)
838+
.reliability(mqtt2ros.ros.qos.reliability);
839+
mqtt2ros.ros.publisher = create_generic_publisher(
840+
mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, qos);
841+
842+
mqtt2ros.ros.is_stale = false;
843+
} catch (const rclcpp::exceptions::RCLError& e) {
844+
RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s",
845+
e.what());
846+
}
847+
}
848+
}
849+
815850
void MqttClient::setupClient() {
816851

817852
// basic client connection options
@@ -1210,6 +1245,7 @@ void MqttClient::mqtt2fixed(mqtt::const_message_ptr mqtt_msg) {
12101245
} catch (const rclcpp::exceptions::RCLError& e) {
12111246
RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s",
12121247
e.what());
1248+
return;
12131249
}
12141250
}
12151251

0 commit comments

Comments
 (0)