Skip to content

Commit 0bd52b2

Browse files
committed
Allow auto or explicit selection of ros2mqtt subscriber QoS
1 parent 49ced1d commit 0bd52b2

File tree

3 files changed

+208
-34
lines changed

3 files changed

+208
-34
lines changed

mqtt_client/config/params.ros2.fixed.yaml

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,26 @@
99
- /ping/primitive
1010
/ping/primitive:
1111
mqtt_topic: pingpong/primitive
12-
primitive: true
12+
# This is implied by the explicit ros type
13+
#primitive: true
1314
ros_type: std_msgs/msg/Bool
15+
advanced:
16+
ros:
17+
queue_size: 10
18+
qos:
19+
durability: auto
20+
reliability: auto
1421
mqtt2ros:
1522
mqtt_topics:
1623
- pingpong/primitive
1724
pingpong/primitive:
1825
ros_topic: /pong/primitive
19-
primitive: true
26+
#This is implied by the explicit ros type
27+
#primitive: true
2028
ros_type: std_msgs/msg/Bool
29+
advanced:
30+
ros:
31+
queue_size: 10
32+
qos:
33+
durability: transient_local
34+
reliability: reliable

mqtt_client/include/mqtt_client/MqttClient.ros2.hpp

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ SOFTWARE.
3030
#include <filesystem>
3131
#include <map>
3232
#include <memory>
33+
#include <optional>
3334
#include <string>
3435

3536
#define FMT_HEADER_ONLY
@@ -72,6 +73,9 @@ class MqttClient : public rclcpp::Node,
7273
explicit MqttClient(const rclcpp::NodeOptions& options);
7374

7475
protected:
76+
class Ros2MqttInterface;
77+
class Mqtt2RosInterface;
78+
7579
/**
7680
* @brief Loads ROS parameters from parameter server.
7781
*/
@@ -179,6 +183,31 @@ class MqttClient : public rclcpp::Node,
179183
*/
180184
void setup();
181185

186+
/**
187+
* @brief Get the resolved compatible QOS from the interface and the endpoint
188+
*
189+
* This uses the two endpoints to decide upon a compatible QoS, resolving any "auto" QoS settings
190+
*
191+
* @param ros_topic the ROS topic we are looking on
192+
* @param tei Topic endpoint info
193+
* @param ros2mqtt the ROS to MQTT interface spec
194+
*
195+
* @returns The compatible QoS or nullopt if no compatible combination is found
196+
*/
197+
std::optional<rclcpp::QoS> getCompatibleQoS(
198+
const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei,
199+
const Ros2MqttInterface& ros2mqtt) const;
200+
201+
/**
202+
* @brief Get the candiate topic endpoints for subscription matching
203+
*
204+
* @param ros2mqtt the ROS to MQTT interface spec
205+
*
206+
* @returns The compatible QoS or nullopt if no compatible combination is found
207+
*/
208+
std::vector<rclcpp::TopicEndpointInfo> getCandidatePublishers(
209+
const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const;
210+
182211
/**
183212
* @brief Checks all active ROS topics in order to set up generic subscribers.
184213
*/
@@ -405,6 +434,11 @@ class MqttClient : public rclcpp::Node,
405434
std::string msg_type; ///< message type of subscriber
406435
int queue_size = 1; ///< ROS subscriber queue size
407436
bool is_stale = false; ///< whether a new generic publisher/subscriber is required
437+
struct {
438+
// If these are set to nullopt then that part of the QoS is determine automatically based on discovery
439+
std::optional<rclcpp::ReliabilityPolicy> reliability;
440+
std::optional<rclcpp::DurabilityPolicy> durability;
441+
} qos;
408442
} ros; ///< ROS-related variables
409443
struct {
410444
std::string topic; ///< MQTT topic

mqtt_client/src/MqttClient.ros2.cpp

Lines changed: 158 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -175,14 +175,15 @@ bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg,
175175
*
176176
* @param [in] serialized_msg generic serialized ROS message
177177
* @param [in] msg_type ROS message type, e.g. `std_msgs/msg/String`
178+
* @param [in] fixed_encoding Truthfully this is just an ugly hack to change the encoding of bools in some cases
178179
* @param [out] primitive string representation of primitive message data
179180
*
180181
* @return true if primitive ROS message type was found
181182
* @return false if ROS message type is not primitive
182183
*/
183184
bool primitiveRosMessageToString(
184185
const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
185-
const std::string& msg_type, std::string& primitive) {
186+
const std::string& msg_type, bool fixed_encoding, std::string& primitive) {
186187

187188
bool found_primitive = true;
188189

@@ -193,7 +194,10 @@ bool primitiveRosMessageToString(
193194
} else if (msg_type == "std_msgs/msg/Bool") {
194195
std_msgs::msg::Bool msg;
195196
deserializeRosMessage(*serialized_msg, msg);
196-
primitive = msg.data ? "1" : "0";
197+
if (fixed_encoding)
198+
primitive = msg.data ? "1" : "0";
199+
else
200+
primitive = msg.data ? "true" : "false";
197201
} else if (msg_type == "std_msgs/msg/Char") {
198202
std_msgs::msg::Char msg;
199203
deserializeRosMessage(*serialized_msg, msg);
@@ -311,6 +315,10 @@ void MqttClient::loadParameters() {
311315
declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
312316
param_desc.description = "ROS subscriber queue size";
313317
declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
318+
param_desc.description = "ROS subscriber QoS reliability";
319+
declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
320+
param_desc.description = "ROS subscriber QoS durability";
321+
declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
314322
param_desc.description = "MQTT QoS value";
315323
declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
316324
param_desc.description = "whether to retain MQTT message";
@@ -437,6 +445,36 @@ void MqttClient::loadParameters() {
437445
queue_size_param))
438446
ros2mqtt.ros.queue_size = queue_size_param.as_int();
439447

448+
rclcpp::Parameter durability_param;
449+
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) {
450+
const auto p = durability_param.as_string();
451+
if (p == "volatile") {
452+
ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile;
453+
} else if (p == "transient_local") {
454+
ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::TransientLocal;
455+
} else if (p == "auto") {
456+
ros2mqtt.ros.qos.durability = {};
457+
} else {
458+
RCLCPP_ERROR(get_logger(), "Unexpected durability %s", p.c_str ());
459+
exit(EXIT_FAILURE);
460+
}
461+
}
462+
463+
rclcpp::Parameter reliability_param;
464+
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) {
465+
const auto p = reliability_param.as_string();
466+
if (p == "best_effort") {
467+
ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort;
468+
} else if (p == "reliable") {
469+
ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable;
470+
} else if (p == "auto") {
471+
ros2mqtt.ros.qos.reliability = {};
472+
} else {
473+
RCLCPP_ERROR(get_logger(), "Unexpected reliability %s", p.c_str ());
474+
exit(EXIT_FAILURE);
475+
}
476+
}
477+
440478
// ros2mqtt[k]/advanced/mqtt/qos
441479
rclcpp::Parameter qos_param;
442480
if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), qos_param))
@@ -628,64 +666,152 @@ void MqttClient::setup() {
628666
std::bind(&MqttClient::setupSubscriptions, this));
629667
}
630668

669+
std::optional<rclcpp::QoS> MqttClient::getCompatibleQoS (const std::string &ros_topic, const rclcpp::TopicEndpointInfo &tei,
670+
const Ros2MqttInterface &ros2mqtt) const
671+
{
672+
// the basic premise here is that we take the QoS from the publisher, overwrite any parts that are explicitly set
673+
// via configuration then check if the result is compatible with the publisher
674+
auto qos = tei.qos_profile ();
675+
676+
if (auto r = ros2mqtt.ros.qos.reliability)
677+
qos.reliability (*r);
678+
if (auto d = ros2mqtt.ros.qos.durability)
679+
qos.durability (*d);
680+
qos.keep_last (ros2mqtt.ros.queue_size);
681+
682+
const auto qres = rclcpp::qos_check_compatible (tei.qos_profile (), qos);
683+
684+
switch (qres.compatibility)
685+
{
686+
case rclcpp::QoSCompatibility::Ok:
687+
return qos;
688+
case rclcpp::QoSCompatibility::Warning:
689+
RCLCPP_DEBUG(get_logger(), "QoS compatibility warning on topic %s - %s", ros_topic.c_str(), qres.reason.c_str ());
690+
// presumably this is still compatible
691+
return qos;
692+
case rclcpp::QoSCompatibility::Error:
693+
default:
694+
return {};
695+
}
696+
697+
}
698+
699+
std::vector<rclcpp::TopicEndpointInfo> MqttClient::getCandidatePublishers(
700+
const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const
701+
{
702+
const auto &pubs = get_publishers_info_by_topic(ros_topic);
703+
704+
if (pubs.empty ())
705+
return {};
706+
707+
std::vector<rclcpp::TopicEndpointInfo> ret;
708+
709+
ret.reserve (pubs.size ());
710+
711+
for (const auto &p : pubs)
712+
{
713+
const std::string msg_type = p.topic_type();
714+
715+
// if the type isn't set, match aginst all t ypes, otherwise only match against mtching types
716+
if (ros2mqtt.ros.msg_type.empty () || msg_type == ros2mqtt.ros.msg_type)
717+
ret.push_back (p);
718+
}
719+
720+
// If we found any matching types, return those
721+
if (! ret.empty ())
722+
return ret;
723+
724+
// If we didn't and we aren't fix type, then just return the full set of publishers
725+
if (! ros2mqtt.fixed_type)
726+
return pubs;
727+
728+
// None of these publishers will work... :sad_panda:
729+
return {};
730+
}
631731

632732
void MqttClient::setupSubscriptions() {
633733

634-
// get info of all topics
635-
const auto all_topics_and_types = get_topic_names_and_types();
734+
// For each ros2mqtt interface, check if we need to do a lazy subscription (eg, lazy subscription would be if we
735+
// are determining either the type of QoS dynamically). If we don't, then just make the fixed subscriber if its not
736+
// already made.
737+
//
738+
// If we do, we check each publisher for a potential match. The first step is to filter down the list of
739+
// publishers based on the type. If we are fixed type then that list only includes publishers with matching types.
740+
// If we aren't its a little trickier. For dynamic types, if we already have matched against a type previously, the
741+
// candidate list will include all publishers which have matching types if any exist. If none of the publishers
742+
// have matching types, then the list will include all publishers.
743+
//
744+
// Then for each candidate publisher, check if their QoS is compatible with the QoS specic in the configuration. This
745+
// is really just needed because there is the potential for someone to set half of the QoS to auto and half to
746+
// explicit.
747+
// Condition for requiring "lazy" subscription where we need to walk the
748+
const auto requires_lazy_subscription = [] (const Ros2MqttInterface &ros2mqtt)
749+
{
750+
if (! ros2mqtt.fixed_type)
751+
return true;
752+
if (ros2mqtt.ros.qos.reliability == std::nullopt ||
753+
ros2mqtt.ros.qos.durability == std::nullopt)
754+
return true;
755+
return false;
756+
};
636757

637-
// check for ros2mqtt topics
638758
for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) {
639759

640-
std::function<void(const std::shared_ptr<rclcpp::SerializedMessage> msg)>
641-
bound_callback_func =
760+
std::function<void(const std::shared_ptr<rclcpp::SerializedMessage> msg)> bound_callback_func =
642761
std::bind(&MqttClient::ros2mqtt, this, std::placeholders::_1, ros_topic);
643762

644-
if (ros2mqtt.fixed_type && ! ros2mqtt.ros.subscriber) {
763+
if (! requires_lazy_subscription (ros2mqtt))
764+
{
645765
try {
766+
if (ros2mqtt.ros.subscriber)
767+
continue;
768+
646769
ros2mqtt.ros.subscriber = create_generic_subscription(
647770
ros_topic, ros2mqtt.ros.msg_type, ros2mqtt.ros.queue_size, bound_callback_func);
648771
ros2mqtt.ros.is_stale = false;
649-
772+
RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'",
773+
ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str());
650774
} catch (rclcpp::exceptions::RCLError& e) {
651775
RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s",
652776
e.what());
653777
continue;
654778
}
655-
RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'",
656-
ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str());
657-
658779
} else {
659-
const auto &pubs = get_publishers_info_by_topic(ros_topic);
780+
const auto &pubs = getCandidatePublishers (ros_topic, ros2mqtt);
660781

661782
if (pubs.empty()) continue;
662783

663-
const auto endpointInfo = pubs.front ();
664-
// check if message type has changed or if mapping is stale
665-
const std::string msg_type = endpointInfo.topic_type();
784+
for (auto endpointInfo : pubs) {
785+
try {
786+
// check if message type has changed or if mapping is stale
787+
const std::string msg_type = endpointInfo.topic_type();
666788

667-
if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) continue;
668-
ros2mqtt.ros.is_stale = false;
669-
ros2mqtt.ros.msg_type = msg_type;
789+
if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale && ros2mqtt.ros.subscriber)
790+
continue;
670791

671-
try {
672-
auto qos = endpointInfo.qos_profile ();
673-
ros2mqtt.ros.subscriber = create_generic_subscription(
674-
ros_topic, msg_type, qos.keep_last(ros2mqtt.ros.queue_size),
675-
bound_callback_func);
792+
auto const qos = getCompatibleQoS (ros_topic, endpointInfo, ros2mqtt);
676793

677-
} catch (rclcpp::exceptions::RCLError& e) {
678-
RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s",
679-
e.what());
680-
continue;
794+
if (! qos)
795+
continue;
796+
797+
ros2mqtt.ros.is_stale = false;
798+
ros2mqtt.ros.msg_type = msg_type;
799+
800+
ros2mqtt.ros.subscriber = create_generic_subscription(
801+
ros_topic, msg_type, *qos, bound_callback_func);
802+
803+
RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'",
804+
ros_topic.c_str(), msg_type.c_str());
805+
} catch (rclcpp::exceptions::RCLError& e) {
806+
RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s",
807+
e.what());
808+
continue;
809+
}
681810
}
682-
RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'",
683-
ros_topic.c_str(), msg_type.c_str());
684811
}
685812
}
686813
}
687814

688-
689815
void MqttClient::setupClient() {
690816

691817
// basic client connection options
@@ -787,7 +913,7 @@ void MqttClient::ros2mqtt(
787913
// resolve ROS messages to primitive strings if possible
788914
std::string payload;
789915
bool found_primitive =
790-
primitiveRosMessageToString(serialized_msg, ros_msg_type.name, payload);
916+
primitiveRosMessageToString(serialized_msg, ros_msg_type.name, ros2mqtt.fixed_type, payload);
791917
if (found_primitive) {
792918
payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
793919
} else {

0 commit comments

Comments
 (0)