diff --git a/README.md b/README.md index c00aa13..e2fad69 100644 --- a/README.md +++ b/README.md @@ -275,58 +275,66 @@ client: ```yaml bridge: - ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics + ros2mqtt: # Array specifying which ROS topics to map to which MQTT topics - ros_topic: # ROS topic whose messages are transformed to MQTT messages mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker primitive: # [false] whether to publish as primitive message inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side) advanced: ros: - queue_size: # [1] ROS subscriber queue size + queue_size: # [1] ROS subscriber queue size mqtt: - qos: # [0] MQTT QoS value - retained: # [false] whether to retain MQTT message - mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics + qos: # [0] MQTT QoS value + retained: # [false] whether to retain MQTT message + mqtt2ros: # Array specifying which MQTT topics to map to which ROS topics - mqtt_topic: # MQTT topic on which messages are received from the broker ros_topic: # ROS topic on which corresponding MQTT messages are published primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client) advanced: mqtt: - qos: # [0] MQTT QoS value + qos: # [0] MQTT QoS value ros: - queue_size: # [1] ROS publisher queue size - latched: # [false] whether to latch ROS message + queue_size: # [1] ROS publisher queue size + latched: # [false] whether to latch ROS message ``` ##### ROS 2 ```yaml bridge: - ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics - ros_topics: # Array specifying which ROS topics to bridge + ros2mqtt: # Object specifying which ROS topics to map to which MQTT topics + ros_topics: # Array specifying which ROS topics to bridge - {{ ros_topic_name }} # The ROS topic that should be bridged, corresponds to the sub-object in the YAML {{ ros_topic_name }}: mqtt_topic: # MQTT topic on which the corresponding ROS messages are sent to the broker primitive: # [false] whether to publish as primitive message + ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher inject_timestamp: # [false] whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side) advanced: ros: - queue_size: # [1] ROS subscriber queue size + queue_size: # [1] ROS subscriber queue size + qos: + reliability: # [auto] One of "auto", "system_default", "reliable", "best_effort". If auto, the QoS is automatically determined via the publisher + durability: # [auto] One of "auto", "system_default", "volatile", "transient_local". If auto, the QoS is automatically determined via the publisher mqtt: - qos: # [0] MQTT QoS value - retained: # [false] whether to retain MQTT message - mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics - mqtt_topics: # Array specifying which ROS topics to bridge + qos: # [0] MQTT QoS value + retained: # [false] whether to retain MQTT message + mqtt2ros: # Object specifying which MQTT topics to map to which ROS topics + mqtt_topics: # Array specifying which ROS topics to bridge - {{ mqtt_topic_name }} # The MQTT topic that should be bridged, corresponds to the sub-object in the YAML {{ mqtt_topic_name }}: ros_topic: # ROS topic on which corresponding MQTT messages are published + ros_type: # [*empty*] If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message primitive: # [false] whether to publish as primitive message (if coming from non-ROS MQTT client) advanced: mqtt: - qos: # [0] MQTT QoS value + qos: # [0] MQTT QoS value ros: - queue_size: # [1] ROS publisher queue size - latched: # [false] whether to latch ROS message + queue_size: # [1] ROS publisher queue size + latched: # [false] whether to latch ROS message + qos: + reliability: # [system_default] One of "system_default", "reliable", "best_effort". + durability: # [system_default] One of "system_default", "volatile", "transient_local". ``` ## Primitive Messages diff --git a/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml b/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml new file mode 100644 index 0000000..176a62c --- /dev/null +++ b/mqtt_client/config/params.ros2.fixed-type-and-qos.yaml @@ -0,0 +1,32 @@ +/**/*: + ros__parameters: + broker: + host: localhost + port: 1883 + bridge: + ros2mqtt: + ros_topics: + - /ping/primitive + /ping/primitive: + mqtt_topic: pingpong/primitive + primitive: false + ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: auto + reliability: auto + mqtt2ros: + mqtt_topics: + - pingpong/primitive + pingpong/primitive: + ros_topic: /pong/primitive + primitive: false + ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: transient_local + reliability: reliable diff --git a/mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml b/mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml new file mode 100644 index 0000000..80266b3 --- /dev/null +++ b/mqtt_client/config/params.ros2.primitive-fixed-type-and-qos.yaml @@ -0,0 +1,32 @@ +/**/*: + ros__parameters: + broker: + host: localhost + port: 1883 + bridge: + ros2mqtt: + ros_topics: + - /ping/primitive + /ping/primitive: + mqtt_topic: pingpong/primitive + primitive: true + ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: auto + reliability: auto + mqtt2ros: + mqtt_topics: + - pingpong/primitive + pingpong/primitive: + ros_topic: /pong/primitive + primitive: true + ros_type: std_msgs/msg/Bool + advanced: + ros: + queue_size: 10 + qos: + durability: transient_local + reliability: reliable diff --git a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp index c9a2d38..0d5c6b8 100644 --- a/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp +++ b/mqtt_client/include/mqtt_client/MqttClient.ros2.hpp @@ -30,16 +30,18 @@ SOFTWARE. #include #include #include +#include #include #define FMT_HEADER_ONLY #include +#include #include #include #include -#include #include #include +#include #include @@ -71,6 +73,9 @@ class MqttClient : public rclcpp::Node, explicit MqttClient(const rclcpp::NodeOptions& options); protected: + struct Ros2MqttInterface; + struct Mqtt2RosInterface; + /** * @brief Loads ROS parameters from parameter server. */ @@ -98,8 +103,7 @@ class MqttClient : public rclcpp::Node, * @return true if parameter was successfully retrieved * @return false if parameter was not found or default was used */ - bool loadParameter(const std::string& key, std::string& value, - const std::string& default_value); + bool loadParameter(const std::string& key, std::string& value, const std::string& default_value); /** * @brief Loads requested ROS parameter from parameter server. @@ -180,10 +184,42 @@ class MqttClient : public rclcpp::Node, void setup(); /** - * @brief Checks all active ROS topics in order to set up generic subscribers. + * @brief Get the resolved compatible QOS from the interface and the endpoint + * + * This uses the two endpoints to decide upon a compatible QoS, resolving any "auto" QoS settings + * + * @param ros_topic the ROS topic we are looking on + * @param tei Topic endpoint info + * @param ros2mqtt the ROS to MQTT interface spec + * + * @returns The compatible QoS or nullopt if no compatible combination is found + */ + std::optional getCompatibleQoS( + const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei, + const Ros2MqttInterface& ros2mqtt) const; + + /** + * @brief Get the candiate topic endpoints for subscription matching + * + * @param ros2mqtt the ROS to MQTT interface spec + * + * @returns The compatible QoS or nullopt if no compatible combination is found + */ + std::vector getCandidatePublishers( + const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const; + + /** + * @brief Setup any subscriptions we can. + * + * These may be fixed type/QoS, or dynamically matched against active publisher */ void setupSubscriptions(); + /** + * @brief Setup any publishers that we can + */ + void setupPublishers(); + /** * @brief Sets up the client connection options and initializes the client * object. @@ -236,12 +272,17 @@ class MqttClient : public rclcpp::Node, /** * @brief Publishes a primitive message received via MQTT to ROS. * - * Currently not implemented. - * * @param mqtt_msg MQTT message */ void mqtt2primitive(mqtt::const_message_ptr mqtt_msg); + /** + * @brief Publishes a primitive message received via MQTT to ROS. + * + * @param mqtt_msg MQTT message + */ + void mqtt2fixed(mqtt::const_message_ptr mqtt_msg); + /** * @brief Callback for when the client has successfully connected to the * broker. @@ -351,10 +392,9 @@ class MqttClient : public rclcpp::Node, std::string user; ///< username std::string pass; ///< password struct { - bool enabled; ///< whether to connect via SSL/TLS - std::filesystem::path - ca_certificate; ///< public CA certificate trusted by client - } tls; ///< SSL/TLS-related variables + bool enabled; ///< whether to connect via SSL/TLS + std::filesystem::path ca_certificate; ///< public CA certificate trusted by client + } tls; ///< SSL/TLS-related variables }; /** @@ -377,14 +417,14 @@ class MqttClient : public rclcpp::Node, double keep_alive_interval; ///< keep-alive interval int max_inflight; ///< maximum number of inflight messages struct { - std::filesystem::path certificate; ///< client certificate - std::filesystem::path key; ///< client private keyfile - std::string password; ///< decryption password for private key - int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305) - bool verify; ///< Verify the client should conduct - ///< post-connect checks - std::vector alpn_protos; ///< list of ALPN protocols - } tls; ///< SSL/TLS-related variables + std::filesystem::path certificate; ///< client certificate + std::filesystem::path key; ///< client private keyfile + std::string password; ///< decryption password for private key + int version; ///< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305) + bool verify; ///< Verify the client should conduct + ///< post-connect checks + std::vector alpn_protos; ///< list of ALPN protocols + } tls; ///< SSL/TLS-related variables }; /** @@ -397,12 +437,18 @@ class MqttClient : public rclcpp::Node, std::string msg_type; ///< message type of subscriber int queue_size = 1; ///< ROS subscriber queue size bool is_stale = false; ///< whether a new generic publisher/subscriber is required + struct { + // If these are set to nullopt then that part of the QoS is determine automatically based on discovery + std::optional reliability; + std::optional durability; + } qos; } ros; ///< ROS-related variables struct { std::string topic; ///< MQTT topic int qos = 0; ///< MQTT QoS value bool retained = false; ///< whether to retain MQTT message } mqtt; ///< MQTT-related variables + bool fixed_type = false; ///< whether the published message type is specified explicitly bool primitive = false; ///< whether to publish as primitive message bool stamped = false; ///< whether to inject timestamp in MQTT message }; @@ -412,8 +458,8 @@ class MqttClient : public rclcpp::Node, */ struct Mqtt2RosInterface { struct { - int qos = 0; ///< MQTT QoS value - } mqtt; ///< MQTT-related variables + int qos = 0; ///< MQTT QoS value + } mqtt; ///< MQTT-related variables struct { std::string topic; ///< ROS topic std::string msg_type; ///< message type of publisher @@ -421,9 +467,14 @@ class MqttClient : public rclcpp::Node, rclcpp::Publisher::SharedPtr latency_publisher; ///< ROS publisher for latency int queue_size = 1; ///< ROS publisher queue size + struct { + rclcpp::ReliabilityPolicy reliability = rclcpp::ReliabilityPolicy::SystemDefault; + rclcpp::DurabilityPolicy durability = rclcpp::DurabilityPolicy::SystemDefault; + } qos; bool latched = false; ///< whether to latch ROS message bool is_stale = false; ///< whether a new generic publisher/subscriber is required - } ros; ///< ROS-related variables + } ros; ///< ROS-related variables + bool fixed_type = false; ///< whether the published ros message type is specified explicitly bool primitive = false; ///< whether to publish as primitive message (if ///< coming from non-ROS MQTT client) bool stamped = false; ///< whether timestamp is injected @@ -535,27 +586,25 @@ bool MqttClient::loadParameter(const std::string& key, T& value, template -bool MqttClient::loadParameter(const std::string& key, std::vector& value) -{ +bool MqttClient::loadParameter(const std::string& key, std::vector& value) { const bool found = get_parameter(key, value); if (found) RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(), - fmt::format("{}", fmt::join(value, ", ")).c_str()); + fmt::format("{}", fmt::join(value, ", ")).c_str()); return found; } template bool MqttClient::loadParameter(const std::string& key, std::vector& value, - const std::vector& default_value) -{ + const std::vector& default_value) { const bool found = get_parameter_or(key, value, default_value); if (!found) RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'", key.c_str(), fmt::format("{}", fmt::join(value, ", ")).c_str()); if (found) RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(), - fmt::format("{}", fmt::join(value, ", ")).c_str()); + fmt::format("{}", fmt::join(value, ", ")).c_str()); return found; } diff --git a/mqtt_client/src/MqttClient.ros2.cpp b/mqtt_client/src/MqttClient.ros2.cpp index 5d6247b..c45be95 100644 --- a/mqtt_client/src/MqttClient.ros2.cpp +++ b/mqtt_client/src/MqttClient.ros2.cpp @@ -28,6 +28,8 @@ SOFTWARE. #include #include #include +#include +#include #include #include @@ -59,6 +61,122 @@ const std::string MqttClient::kRosMsgTypeMqttTopicPrefix = const std::string MqttClient::kLatencyRosTopicPrefix = "~/latencies/"; +template +T mqtt2float(mqtt::const_message_ptr mqtt_msg) { + auto str_msg = mqtt_msg->to_string (); + std::size_t pos; + const T v = std::stold(str_msg, &pos); + + if (pos != str_msg.size()) + throw std::invalid_argument ("not all charaters processed"); + + return v; +} + +template +T mqtt2int(mqtt::const_message_ptr mqtt_msg) { + auto str_msg = mqtt_msg->to_string (); + std::size_t pos; + const T v = std::stoll(str_msg, &pos); + + if (pos != str_msg.size()) + throw std::invalid_argument ("not all charaters processed"); + + return v; +} + +bool mqtt2bool(mqtt::const_message_ptr mqtt_msg) { + const std::string str_msg = mqtt_msg->to_string (); + std::string bool_str = mqtt_msg->to_string (); + std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(), + ::tolower); + if (bool_str == "true" || bool_str == "1") return true; + if (bool_str == "false" || bool_str == "0") return false; + + throw std::invalid_argument ("unable to decode string"); +} + +bool fixedMqtt2PrimitiveRos(mqtt::const_message_ptr mqtt_msg, + const std::string& msg_type, + rclcpp::SerializedMessage &serialized_msg) { + try { + if (msg_type == "std_msgs/msg/String") { + std_msgs::msg::String msg; + msg.data = mqtt_msg->to_string(); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Bool") { + std_msgs::msg::Bool msg; + msg.data = mqtt2bool(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Char") { + std_msgs::msg::Char msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/UInt8") { + std_msgs::msg::UInt8 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/UInt16") { + std_msgs::msg::UInt16 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/UInt32") { + std_msgs::msg::UInt32 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/UInt64") { + std_msgs::msg::UInt64 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Int8") { + std_msgs::msg::Int8 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Int16") { + std_msgs::msg::Int16 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Int32") { + std_msgs::msg::Int32 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Int64") { + std_msgs::msg::Int32 msg; + msg.data = mqtt2int(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Float32") { + std_msgs::msg::Float32 msg; + msg.data = mqtt2float(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else if (msg_type == "std_msgs/msg/Float64") { + std_msgs::msg::Float64 msg; + msg.data = mqtt2float(mqtt_msg); + + serializeRosMessage(msg, serialized_msg); + } else { + throw std::domain_error("Unhandled message type (" + msg_type + ")"); + } + + return true; + + } catch (const std::exception &) { + return false; + } + + +} /** * @brief Extracts string of primitive data types from ROS message. @@ -200,10 +318,16 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to publish as primitive message"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); + param_desc.description = "If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the publisher"; + declare_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); param_desc.description = "ROS subscriber queue size"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); + param_desc.description = "ROS subscriber QoS reliability"; + declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS subscriber QoS durability"; + declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "MQTT QoS value"; declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); param_desc.description = "whether to retain MQTT message"; @@ -218,10 +342,16 @@ void MqttClient::loadParameters() { declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to publish as primitive message (if coming from non-ROS MQTT client)"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); + param_desc.description = "If set, the ROS msg type provided will be used. If empty, the type is automatically deduced via the MQTT message"; + declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_type", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "MQTT QoS value"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); param_desc.description = "ROS publisher queue size"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc); + param_desc.description = "ROS publisher QoS durability"; + declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.durability", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); + param_desc.description = "ROS publisher QoS reliability"; + declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.reliability", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc); param_desc.description = "whether to latch ROS message"; declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc); } @@ -294,6 +424,14 @@ void MqttClient::loadParameters() { if (get_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), primitive_param)) ros2mqtt.primitive = primitive_param.as_bool(); + // ros2mqtt[k]/ros_type + rclcpp::Parameter ros_type_param; + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.ros_type", ros_topic), ros_type_param)) { + ros2mqtt.ros.msg_type = ros_type_param.as_string(); + ros2mqtt.fixed_type = true; + RCLCPP_DEBUG(get_logger(), "Using explicit ROS message type '%s'", ros2mqtt.ros.msg_type.c_str()); + } + // ros2mqtt[k]/inject_timestamp rclcpp::Parameter stamped_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), stamped_param)) @@ -313,6 +451,40 @@ void MqttClient::loadParameters() { queue_size_param)) ros2mqtt.ros.queue_size = queue_size_param.as_int(); + rclcpp::Parameter durability_param; + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.durability", ros_topic), durability_param)) { + const auto p = durability_param.as_string(); + if (p == "system_default") { + ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault; + } else if (p == "volatile") { + ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile; + } else if (p == "transient_local") { + ros2mqtt.ros.qos.durability = rclcpp::DurabilityPolicy::TransientLocal; + } else if (p == "auto") { + ros2mqtt.ros.qos.durability = {}; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected durability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + + rclcpp::Parameter reliability_param; + if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.qos.reliability", ros_topic), reliability_param)) { + const auto p = reliability_param.as_string(); + if (p == "system_default") { + ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault; + } else if (p == "best_effort") { + ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort; + } else if (p == "reliable") { + ros2mqtt.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable; + } else if (p == "auto") { + ros2mqtt.ros.qos.reliability = {}; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected reliability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + // ros2mqtt[k]/advanced/mqtt/qos rclcpp::Parameter qos_param; if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), qos_param)) @@ -350,6 +522,14 @@ void MqttClient::loadParameters() { if (get_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param)) mqtt2ros.primitive = primitive_param.as_bool(); + + rclcpp::Parameter ros_type_param; + if (get_parameter(fmt::format("bridge.mqtt2ros.{}.ros_type", mqtt_topic), ros_type_param)) { + mqtt2ros.ros.msg_type = ros_type_param.as_string(); + mqtt2ros.fixed_type = true; + RCLCPP_DEBUG(get_logger(), "Using explicit ROS message type '%s' for '%s'", mqtt2ros.ros.msg_type.c_str(), ros_topic.c_str()); + } + // mqtt2ros[k]/advanced/mqtt/qos rclcpp::Parameter qos_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param)) @@ -361,6 +541,37 @@ void MqttClient::loadParameters() { queue_size_param)) mqtt2ros.ros.queue_size = queue_size_param.as_int(); + rclcpp::Parameter durability_param; + if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.durability", mqtt_topic), durability_param)) { + const auto p = durability_param.as_string(); + if (p == "system_default") { + mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::SystemDefault; + } else if (p == "volatile") { + mqtt2ros.ros.qos.durability = rclcpp::DurabilityPolicy::Volatile; + } else if (p == "transient_local") { + mqtt2ros.ros.qos.durability = + rclcpp::DurabilityPolicy::TransientLocal; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected durability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + + rclcpp::Parameter reliability_param; + if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.qos.reliability", mqtt_topic), reliability_param)) { + const auto p = reliability_param.as_string(); + if (p == "system_default") { + mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::SystemDefault; + } else if (p == "best_effort") { + mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::BestEffort; + } else if (p == "reliable") { + mqtt2ros.ros.qos.reliability = rclcpp::ReliabilityPolicy::Reliable; + } else { + RCLCPP_ERROR(get_logger(), "Unexpected reliability %s", p.c_str ()); + exit(EXIT_FAILURE); + } + } + // mqtt2ros[k]/advanced/ros/latched rclcpp::Parameter latched_param; if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), latched_param)) { @@ -463,42 +674,184 @@ void MqttClient::setup() { check_subscriptions_timer_ = create_wall_timer(std::chrono::duration(1.0), std::bind(&MqttClient::setupSubscriptions, this)); + + setupPublishers (); +} + +std::optional MqttClient::getCompatibleQoS (const std::string &ros_topic, const rclcpp::TopicEndpointInfo &tei, + const Ros2MqttInterface &ros2mqtt) const +{ + // the basic premise here is that we take the QoS from the publisher, overwrite any parts that are explicitly set + // via configuration then check if the result is compatible with the publisher + auto qos = tei.qos_profile (); + + if (auto r = ros2mqtt.ros.qos.reliability) + qos.reliability (*r); + if (auto d = ros2mqtt.ros.qos.durability) + qos.durability (*d); + qos.keep_last (ros2mqtt.ros.queue_size); + + const auto qres = rclcpp::qos_check_compatible (tei.qos_profile (), qos); + + switch (qres.compatibility) + { + case rclcpp::QoSCompatibility::Ok: + return qos; + case rclcpp::QoSCompatibility::Warning: + RCLCPP_DEBUG(get_logger(), "QoS compatibility warning on topic %s - %s", ros_topic.c_str(), qres.reason.c_str ()); + // presumably this is still compatible + return qos; + case rclcpp::QoSCompatibility::Error: + default: + return {}; + } + } +std::vector MqttClient::getCandidatePublishers( + const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const +{ + const auto &pubs = get_publishers_info_by_topic(ros_topic); + + if (pubs.empty ()) + return {}; + + std::vector ret; + + ret.reserve (pubs.size ()); + + for (const auto &p : pubs) + { + const std::string msg_type = p.topic_type(); + + // if the type isn't set, match aginst all t ypes, otherwise only match against mtching types + if (ros2mqtt.ros.msg_type.empty () || msg_type == ros2mqtt.ros.msg_type) + ret.push_back (p); + } + + // If we found any matching types, return those + if (! ret.empty ()) + return ret; + + // If we didn't and we aren't fix type, then just return the full set of publishers + if (! ros2mqtt.fixed_type) + return pubs; + + // None of these publishers will work... :sad_panda: + return {}; +} void MqttClient::setupSubscriptions() { - // get info of all topics - const auto all_topics_and_types = get_topic_names_and_types(); + // For each ros2mqtt interface, check if we need to do a lazy subscription (eg, lazy subscription would be if we + // are determining either the type of QoS dynamically). If we don't, then just make the fixed subscriber if its not + // already made. + // + // If we do, we check each publisher for a potential match. The first step is to filter down the list of + // publishers based on the type. If we are fixed type then that list only includes publishers with matching types. + // If we aren't its a little trickier. For dynamic types, if we already have matched against a type previously, the + // candidate list will include all publishers which have matching types if any exist. If none of the publishers + // have matching types, then the list will include all publishers. + // + // Then for each candidate publisher, check if their QoS is compatible with the QoS specic in the configuration. This + // is really just needed because there is the potential for someone to set half of the QoS to auto and half to + // explicit. + // Condition for requiring "lazy" subscription where we need to walk the + const auto requires_lazy_subscription = [] (const Ros2MqttInterface &ros2mqtt) + { + if (! ros2mqtt.fixed_type) + return true; + if (ros2mqtt.ros.qos.reliability == std::nullopt || + ros2mqtt.ros.qos.durability == std::nullopt) + return true; + return false; + }; - // check for ros2mqtt topics for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) { - if (all_topics_and_types.count(ros_topic)) { - - // check if message type has changed or if mapping is stale - const std::string& msg_type = all_topics_and_types.at(ros_topic)[0]; - if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale) continue; - ros2mqtt.ros.is_stale = false; - ros2mqtt.ros.msg_type = msg_type; - - // create new generic subscription, if message type has changed - std::function msg)> - bound_callback_func = std::bind(&MqttClient::ros2mqtt, this, - std::placeholders::_1, ros_topic); + + std::function msg)> bound_callback_func = + std::bind(&MqttClient::ros2mqtt, this, std::placeholders::_1, ros_topic); + + if (! requires_lazy_subscription (ros2mqtt)) + { try { + if (ros2mqtt.ros.subscriber) + continue; + + auto const qos = rclcpp::QoS(ros2mqtt.ros.queue_size) + .reliability(*ros2mqtt.ros.qos.reliability) + .durability(*ros2mqtt.ros.qos.durability); + ros2mqtt.ros.subscriber = create_generic_subscription( - ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func); + ros_topic, ros2mqtt.ros.msg_type, qos, bound_callback_func); + ros2mqtt.ros.is_stale = false; + RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", + ros_topic.c_str(), ros2mqtt.ros.msg_type.c_str()); } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", e.what()); - return; + continue; + } + } else { + const auto &pubs = getCandidatePublishers (ros_topic, ros2mqtt); + + if (pubs.empty()) continue; + + for (auto endpointInfo : pubs) { + try { + // check if message type has changed or if mapping is stale + const std::string msg_type = endpointInfo.topic_type(); + + if (msg_type == ros2mqtt.ros.msg_type && !ros2mqtt.ros.is_stale && ros2mqtt.ros.subscriber) + continue; + + auto const qos = getCompatibleQoS (ros_topic, endpointInfo, ros2mqtt); + + if (! qos) + continue; + + ros2mqtt.ros.is_stale = false; + ros2mqtt.ros.msg_type = msg_type; + + ros2mqtt.ros.subscriber = create_generic_subscription( + ros_topic, msg_type, *qos, bound_callback_func); + + RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", + ros_topic.c_str(), msg_type.c_str()); + } catch (rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s", + e.what()); + continue; + } } - RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'", - ros_topic.c_str(), msg_type.c_str()); } } } +void MqttClient::setupPublishers() { + + for (auto& [mqtt_topic, mqtt2ros] : mqtt2ros_) { + if (mqtt2ros.ros.publisher) + continue; + + // If the type is not fixed, we require a mqtt message in order to deduce the type + if (!mqtt2ros.fixed_type) + continue; + + try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); + mqtt2ros.ros.publisher = create_generic_publisher( + mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, qos); + + mqtt2ros.ros.is_stale = false; + } catch (const rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", + e.what()); + } + } +} void MqttClient::setupClient() { @@ -701,7 +1054,6 @@ void MqttClient::ros2mqtt( void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg, const rclcpp::Time& arrival_stamp) { - std::string mqtt_topic = mqtt_msg->get_topic(); Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; auto& payload = mqtt_msg->get_payload_ref(); @@ -762,8 +1114,8 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) { std::string mqtt_topic = mqtt_msg->get_topic(); Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; - const std::string str_msg = mqtt_msg->to_string(); + const std::string str_msg = mqtt_msg->to_string(); bool found_primitive = false; std::string ros_msg_type = "std_msgs/msg/String"; rclcpp::SerializedMessage serialized_msg; @@ -846,8 +1198,11 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) { // recreate generic publisher try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); mqtt2ros.ros.publisher = create_generic_publisher( - mqtt2ros.ros.topic, ros_msg_type, mqtt2ros.ros.queue_size); + mqtt2ros.ros.topic, ros_msg_type, qos); } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", e.what()); @@ -864,6 +1219,50 @@ void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) { mqtt2ros.ros.publisher->publish(serialized_msg); } +void MqttClient::mqtt2fixed(mqtt::const_message_ptr mqtt_msg) { + + std::string mqtt_topic = mqtt_msg->get_topic(); + Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; + + rclcpp::SerializedMessage serialized_msg; + + if (!fixedMqtt2PrimitiveRos(mqtt_msg, mqtt2ros.ros.msg_type, serialized_msg)) { + RCLCPP_WARN( + get_logger(), + "Could not convert mqtt message into type %s on topic %s ...", + mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str()); + } else { + + if (!mqtt2ros.ros.publisher) + { + RCLCPP_INFO(get_logger(), + "ROS publisher message type on topic '%s' set to '%s'", + mqtt2ros.ros.topic.c_str(), mqtt2ros.ros.msg_type.c_str()); + + // recreate generic publisher + try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); + mqtt2ros.ros.publisher = create_generic_publisher( + mqtt2ros.ros.topic, mqtt2ros.ros.msg_type, qos); + + mqtt2ros.ros.is_stale = false; + } catch (const rclcpp::exceptions::RCLError& e) { + RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", + e.what()); + return; + } + } + + RCLCPP_DEBUG(get_logger(), + "Sending ROS message of type '%s' from MQTT broker to ROS " + "topic '%s' ...", + mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str()); + mqtt2ros.ros.publisher->publish(serialized_msg); + } +} + void MqttClient::connected(const std::string& cause) { @@ -879,11 +1278,17 @@ void MqttClient::connected(const std::string& cause) { // subscribe MQTT topics for (const auto& [mqtt_topic, mqtt2ros] : mqtt2ros_) { - std::string mqtt_topic_to_subscribe = mqtt_topic; - if (!mqtt2ros.primitive) // subscribe topics for ROS message types first - mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic; - client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos); - RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str()); + if (!mqtt2ros.primitive) { + std::string const mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic; + client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos); + RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str()); + } + // If not primitive and not fixed, we need the message type before we can public. In that case + // wait for the type to come in before subscribing to the data topic + if (mqtt2ros.primitive || mqtt2ros.fixed_type) { + client_->subscribe(mqtt_topic, mqtt2ros.mqtt.qos); + RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic.c_str()); + } } } @@ -993,8 +1398,13 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { // publish directly if primitive if (mqtt2ros_.count(mqtt_topic) > 0) { Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic]; + if (mqtt2ros.primitive) { - mqtt2primitive(mqtt_msg); + if (mqtt2ros.fixed_type) { + mqtt2fixed(mqtt_msg); + } else { + mqtt2primitive(mqtt_msg); + } return; } } @@ -1025,6 +1435,21 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { // if ROS message type has changed or if mapping is stale if (ros_msg_type.name != mqtt2ros.ros.msg_type || mqtt2ros.ros.is_stale) { + if (mqtt2ros.fixed_type) { + // We should never be in this situation if the type has been set explicitly. As fixed_type + // is not currently supported through the service based bridge creation and the type name + // not matching means the fixed type specified in the configuration does not match the + // one we just recieved + if (ros_msg_type.name != mqtt2ros.ros.msg_type) + RCLCPP_ERROR(get_logger(), + "Unexpected type name received for topic %s (expected %s but received %s)", + mqtt2ros.ros.topic.c_str(), mqtt2ros.ros.msg_type.c_str(), ros_msg_type.name.c_str()); + if (mqtt2ros.ros.is_stale) + RCLCPP_ERROR(get_logger(), "Topic %s has been unexpectedly marked stale", + mqtt2ros.ros.topic.c_str()); + return; + } + mqtt2ros.ros.msg_type = ros_msg_type.name; mqtt2ros.stamped = ros_msg_type.stamped; RCLCPP_INFO(get_logger(), @@ -1033,8 +1458,11 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) { // recreate generic publisher try { + const auto qos = rclcpp::QoS(mqtt2ros.ros.queue_size) + .durability(mqtt2ros.ros.qos.durability) + .reliability(mqtt2ros.ros.qos.reliability); mqtt2ros.ros.publisher = create_generic_publisher( - mqtt2ros.ros.topic, ros_msg_type.name, mqtt2ros.ros.queue_size); + mqtt2ros.ros.topic, ros_msg_type.name, qos); } catch (rclcpp::exceptions::RCLError& e) { RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s", e.what());