@@ -30,16 +30,18 @@ SOFTWARE.
3030#include < filesystem>
3131#include < map>
3232#include < memory>
33+ #include < optional>
3334#include < string>
3435
3536#define FMT_HEADER_ONLY
3637#include < fmt/format.h>
38+ #include < mqtt/async_client.h>
3739#include < mqtt_client_interfaces/srv/is_connected.hpp>
3840#include < mqtt_client_interfaces/srv/new_mqtt2_ros_bridge.hpp>
3941#include < mqtt_client_interfaces/srv/new_ros2_mqtt_bridge.hpp>
40- #include < mqtt/async_client.h>
4142#include < rclcpp/rclcpp.hpp>
4243#include < rclcpp/serialization.hpp>
44+ #include < rclcpp/qos.hpp>
4345#include < std_msgs/msg/float64.hpp>
4446
4547
@@ -71,6 +73,9 @@ class MqttClient : public rclcpp::Node,
7173 explicit MqttClient (const rclcpp::NodeOptions& options);
7274
7375 protected:
76+ struct Ros2MqttInterface ;
77+ struct Mqtt2RosInterface ;
78+
7479 /* *
7580 * @brief Loads ROS parameters from parameter server.
7681 */
@@ -98,8 +103,7 @@ class MqttClient : public rclcpp::Node,
98103 * @return true if parameter was successfully retrieved
99104 * @return false if parameter was not found or default was used
100105 */
101- bool loadParameter (const std::string& key, std::string& value,
102- const std::string& default_value);
106+ bool loadParameter (const std::string& key, std::string& value, const std::string& default_value);
103107
104108 /* *
105109 * @brief Loads requested ROS parameter from parameter server.
@@ -180,10 +184,42 @@ class MqttClient : public rclcpp::Node,
180184 void setup ();
181185
182186 /* *
183- * @brief Checks all active ROS topics in order to set up generic subscribers.
187+ * @brief Get the resolved compatible QOS from the interface and the endpoint
188+ *
189+ * This uses the two endpoints to decide upon a compatible QoS, resolving any "auto" QoS settings
190+ *
191+ * @param ros_topic the ROS topic we are looking on
192+ * @param tei Topic endpoint info
193+ * @param ros2mqtt the ROS to MQTT interface spec
194+ *
195+ * @returns The compatible QoS or nullopt if no compatible combination is found
196+ */
197+ std::optional<rclcpp::QoS> getCompatibleQoS (
198+ const std::string& ros_topic, const rclcpp::TopicEndpointInfo& tei,
199+ const Ros2MqttInterface& ros2mqtt) const ;
200+
201+ /* *
202+ * @brief Get the candiate topic endpoints for subscription matching
203+ *
204+ * @param ros2mqtt the ROS to MQTT interface spec
205+ *
206+ * @returns The compatible QoS or nullopt if no compatible combination is found
207+ */
208+ std::vector<rclcpp::TopicEndpointInfo> getCandidatePublishers (
209+ const std::string& ros_topic, const Ros2MqttInterface& ros2mqtt) const ;
210+
211+ /* *
212+ * @brief Setup any subscriptions we can.
213+ *
214+ * These may be fixed type/QoS, or dynamically matched against active publisher
184215 */
185216 void setupSubscriptions ();
186217
218+ /* *
219+ * @brief Setup any publishers that we can
220+ */
221+ void setupPublishers ();
222+
187223 /* *
188224 * @brief Sets up the client connection options and initializes the client
189225 * object.
@@ -236,12 +272,17 @@ class MqttClient : public rclcpp::Node,
236272 /* *
237273 * @brief Publishes a primitive message received via MQTT to ROS.
238274 *
239- * Currently not implemented.
240- *
241275 * @param mqtt_msg MQTT message
242276 */
243277 void mqtt2primitive (mqtt::const_message_ptr mqtt_msg);
244278
279+ /* *
280+ * @brief Publishes a primitive message received via MQTT to ROS.
281+ *
282+ * @param mqtt_msg MQTT message
283+ */
284+ void mqtt2fixed (mqtt::const_message_ptr mqtt_msg);
285+
245286 /* *
246287 * @brief Callback for when the client has successfully connected to the
247288 * broker.
@@ -351,10 +392,9 @@ class MqttClient : public rclcpp::Node,
351392 std::string user; // /< username
352393 std::string pass; // /< password
353394 struct {
354- bool enabled; // /< whether to connect via SSL/TLS
355- std::filesystem::path
356- ca_certificate; // /< public CA certificate trusted by client
357- } tls; // /< SSL/TLS-related variables
395+ bool enabled; // /< whether to connect via SSL/TLS
396+ std::filesystem::path ca_certificate; // /< public CA certificate trusted by client
397+ } tls; // /< SSL/TLS-related variables
358398 };
359399
360400 /* *
@@ -377,14 +417,14 @@ class MqttClient : public rclcpp::Node,
377417 double keep_alive_interval; // /< keep-alive interval
378418 int max_inflight; // /< maximum number of inflight messages
379419 struct {
380- std::filesystem::path certificate; // /< client certificate
381- std::filesystem::path key; // /< client private keyfile
382- std::string password; // /< decryption password for private key
383- int version; // /< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
384- bool verify; // /< Verify the client should conduct
385- // /< post-connect checks
386- std::vector<std::string> alpn_protos; // /< list of ALPN protocols
387- } tls; // /< SSL/TLS-related variables
420+ std::filesystem::path certificate; // /< client certificate
421+ std::filesystem::path key; // /< client private keyfile
422+ std::string password; // /< decryption password for private key
423+ int version; // /< TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options.h#L305)
424+ bool verify; // /< Verify the client should conduct
425+ // /< post-connect checks
426+ std::vector<std::string> alpn_protos; // /< list of ALPN protocols
427+ } tls; // /< SSL/TLS-related variables
388428 };
389429
390430 /* *
@@ -397,12 +437,18 @@ class MqttClient : public rclcpp::Node,
397437 std::string msg_type; // /< message type of subscriber
398438 int queue_size = 1 ; // /< ROS subscriber queue size
399439 bool is_stale = false ; // /< whether a new generic publisher/subscriber is required
440+ struct {
441+ // If these are set to nullopt then that part of the QoS is determine automatically based on discovery
442+ std::optional<rclcpp::ReliabilityPolicy> reliability;
443+ std::optional<rclcpp::DurabilityPolicy> durability;
444+ } qos;
400445 } ros; // /< ROS-related variables
401446 struct {
402447 std::string topic; // /< MQTT topic
403448 int qos = 0 ; // /< MQTT QoS value
404449 bool retained = false ; // /< whether to retain MQTT message
405450 } mqtt; // /< MQTT-related variables
451+ bool fixed_type = false ; // /< whether the published message type is specified explicitly
406452 bool primitive = false ; // /< whether to publish as primitive message
407453 bool stamped = false ; // /< whether to inject timestamp in MQTT message
408454 };
@@ -412,18 +458,23 @@ class MqttClient : public rclcpp::Node,
412458 */
413459 struct Mqtt2RosInterface {
414460 struct {
415- int qos = 0 ; // /< MQTT QoS value
416- } mqtt; // /< MQTT-related variables
461+ int qos = 0 ; // /< MQTT QoS value
462+ } mqtt; // /< MQTT-related variables
417463 struct {
418464 std::string topic; // /< ROS topic
419465 std::string msg_type; // /< message type of publisher
420466 rclcpp::GenericPublisher::SharedPtr publisher; // /< generic ROS publisher
421467 rclcpp::Publisher<std_msgs::msg::Float64>::SharedPtr
422468 latency_publisher; // /< ROS publisher for latency
423469 int queue_size = 1 ; // /< ROS publisher queue size
470+ struct {
471+ rclcpp::ReliabilityPolicy reliability = rclcpp::ReliabilityPolicy::SystemDefault;
472+ rclcpp::DurabilityPolicy durability = rclcpp::DurabilityPolicy::SystemDefault;
473+ } qos;
424474 bool latched = false ; // /< whether to latch ROS message
425475 bool is_stale = false ; // /< whether a new generic publisher/subscriber is required
426- } ros; // /< ROS-related variables
476+ } ros; // /< ROS-related variables
477+ bool fixed_type = false ; // /< whether the published ros message type is specified explicitly
427478 bool primitive = false ; // /< whether to publish as primitive message (if
428479 // /< coming from non-ROS MQTT client)
429480 bool stamped = false ; // /< whether timestamp is injected
@@ -535,27 +586,25 @@ bool MqttClient::loadParameter(const std::string& key, T& value,
535586
536587
537588template <typename T>
538- bool MqttClient::loadParameter (const std::string& key, std::vector<T>& value)
539- {
589+ bool MqttClient::loadParameter (const std::string& key, std::vector<T>& value) {
540590 const bool found = get_parameter (key, value);
541591 if (found)
542592 RCLCPP_WARN (get_logger (), " Retrieved parameter '%s' = '[%s]'" , key.c_str (),
543- fmt::format (" {}" , fmt::join (value, " , " )).c_str ());
593+ fmt::format (" {}" , fmt::join (value, " , " )).c_str ());
544594 return found;
545595}
546596
547597
548598template <typename T>
549599bool MqttClient::loadParameter (const std::string& key, std::vector<T>& value,
550- const std::vector<T>& default_value)
551- {
600+ const std::vector<T>& default_value) {
552601 const bool found = get_parameter_or (key, value, default_value);
553602 if (!found)
554603 RCLCPP_WARN (get_logger (), " Parameter '%s' not set, defaulting to '%s'" ,
555604 key.c_str (), fmt::format (" {}" , fmt::join (value, " , " )).c_str ());
556605 if (found)
557606 RCLCPP_DEBUG (get_logger (), " Retrieved parameter '%s' = '%s'" , key.c_str (),
558- fmt::format (" {}" , fmt::join (value, " , " )).c_str ());
607+ fmt::format (" {}" , fmt::join (value, " , " )).c_str ());
559608 return found;
560609}
561610
0 commit comments