@@ -433,7 +433,6 @@ void MqttClient::loadParameters() {
433
433
if (get_parameter (fmt::format (" bridge.ros2mqtt.{}.ros_type" , ros_topic), ros_type_param)) {
434
434
ros2mqtt.ros .msg_type = ros_type_param.as_string ();
435
435
ros2mqtt.fixed_type = true ;
436
- ros2mqtt.primitive = true ;
437
436
RCLCPP_DEBUG (get_logger (), " Using explicit ROS message type '%s'" , ros2mqtt.ros .msg_type .c_str ());
438
437
}
439
438
@@ -532,7 +531,6 @@ void MqttClient::loadParameters() {
532
531
if (get_parameter (fmt::format (" bridge.mqtt2ros.{}.ros_type" , mqtt_topic), ros_type_param)) {
533
532
mqtt2ros.ros .msg_type = ros_type_param.as_string ();
534
533
mqtt2ros.fixed_type = true ;
535
- mqtt2ros.primitive = true ;
536
534
RCLCPP_DEBUG (get_logger (), " Using explicit ROS message type '%s' for '%s'" , mqtt2ros.ros .msg_type .c_str (), ros_topic.c_str ());
537
535
}
538
536
@@ -1280,11 +1278,17 @@ void MqttClient::connected(const std::string& cause) {
1280
1278
1281
1279
// subscribe MQTT topics
1282
1280
for (const auto & [mqtt_topic, mqtt2ros] : mqtt2ros_) {
1283
- std::string mqtt_topic_to_subscribe = mqtt_topic;
1284
- if (!mqtt2ros.primitive ) // subscribe topics for ROS message types first
1285
- mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic;
1286
- client_->subscribe (mqtt_topic_to_subscribe, mqtt2ros.mqtt .qos );
1287
- RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic_to_subscribe.c_str ());
1281
+ if (!mqtt2ros.primitive ) {
1282
+ std::string const mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic;
1283
+ client_->subscribe (mqtt_topic_to_subscribe, mqtt2ros.mqtt .qos );
1284
+ RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic_to_subscribe.c_str ());
1285
+ }
1286
+ // If not primitive and not fixed, we need the message type before we can public. In that case
1287
+ // wait for the type to come in before subscribing to the data topic
1288
+ if (mqtt2ros.primitive || mqtt2ros.fixed_type ) {
1289
+ client_->subscribe (mqtt_topic, mqtt2ros.mqtt .qos );
1290
+ RCLCPP_INFO (get_logger (), " Subscribed MQTT topic '%s'" , mqtt_topic.c_str ());
1291
+ }
1288
1292
}
1289
1293
}
1290
1294
@@ -1395,13 +1399,12 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
1395
1399
if (mqtt2ros_.count (mqtt_topic) > 0 ) {
1396
1400
Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
1397
1401
1398
- if (mqtt2ros.fixed_type ) {
1399
- mqtt2fixed (mqtt_msg);
1400
- return ;
1401
- }
1402
-
1403
1402
if (mqtt2ros.primitive ) {
1404
- mqtt2primitive (mqtt_msg);
1403
+ if (mqtt2ros.fixed_type ) {
1404
+ mqtt2fixed (mqtt_msg);
1405
+ } else {
1406
+ mqtt2primitive (mqtt_msg);
1407
+ }
1405
1408
return ;
1406
1409
}
1407
1410
}
@@ -1432,6 +1435,21 @@ void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
1432
1435
// if ROS message type has changed or if mapping is stale
1433
1436
if (ros_msg_type.name != mqtt2ros.ros .msg_type || mqtt2ros.ros .is_stale ) {
1434
1437
1438
+ if (mqtt2ros.fixed_type ) {
1439
+ // We should never be in this situation if the type has been set explicitly. As fixed_type
1440
+ // is not currently supported through the service based bridge creation and the type name
1441
+ // not matching means the fixed type specified in the configuration does not match the
1442
+ // one we just recieved
1443
+ if (ros_msg_type.name != mqtt2ros.ros .msg_type )
1444
+ RCLCPP_ERROR (get_logger (),
1445
+ " Unexpected type name received for topic %s (expected %s but received %s)" ,
1446
+ mqtt2ros.ros .topic .c_str (), mqtt2ros.ros .msg_type .c_str (), ros_msg_type.name .c_str ());
1447
+ if (mqtt2ros.ros .is_stale )
1448
+ RCLCPP_ERROR (get_logger (), " Topic %s has been unexpectedly marked stale" ,
1449
+ mqtt2ros.ros .topic .c_str ());
1450
+ return ;
1451
+ }
1452
+
1435
1453
mqtt2ros.ros .msg_type = ros_msg_type.name ;
1436
1454
mqtt2ros.stamped = ros_msg_type.stamped ;
1437
1455
RCLCPP_INFO (get_logger (),
0 commit comments