1
1
"""Tests ArduPilot launch"""
2
2
import unittest
3
3
import os
4
+ import time
5
+ from typing import List , Tuple
4
6
7
+ import rclpy
5
8
import pytest
9
+ from rclpy .node import Node
6
10
from launch import LaunchDescription
7
11
from launch .actions import IncludeLaunchDescription
8
12
from launch .launch_description_sources import PythonLaunchDescriptionSource
9
13
from launch_testing .actions import ReadyToTest
10
- from geometry_msgs .msg import PoseStamped
11
- from mavros_msgs .msg import HomePosition , MountControl
12
- from sensor_msgs .msg import NavSatFix
13
-
14
- from test .launch .test_px4_launch import TestPX4Launch
14
+ from geometry_msgs .msg import PoseStamped , Quaternion
15
+ from mavros_msgs .msg import HomePosition , MountControl , Altitude
16
+ from sensor_msgs .msg import NavSatFix , CameraInfo , Image
17
+ from std_msgs .msg import Float32
18
+ from geographic_msgs .msg import BoundingBox , GeoPointStamped , GeoPoseStamped
19
+ from gisnav_msgs .msg import OrthoImage3D
15
20
16
21
17
22
@pytest .mark .launch_test
@@ -26,24 +31,43 @@ def generate_test_description():
26
31
])
27
32
28
33
29
- class TestArduPilotLaunch (TestPX4Launch ):
34
+ class TestArduPilotLaunch (unittest . TestCase ):
30
35
"""Test that all nodes initialize with correct ROS topics"""
31
36
32
- # Override
37
+ GISNAV_TOPIC_NAMES_AND_TYPES = [
38
+ ('/gisnav/orthoimage_3d' , OrthoImage3D ),
39
+ ('/gisnav/bounding_box' , BoundingBox ),
40
+ ('/gisnav/vehicle_geopose' , GeoPoseStamped ),
41
+ ('/gisnav/vehicle_geopose/estimate' , GeoPoseStamped ),
42
+ ('/gisnav/vehicle_altitude' , Altitude ),
43
+ ('/gisnav/vehicle_altitude/estimate' , Altitude ),
44
+ ('/gisnav/gimbal_quaternion' , Quaternion ),
45
+ ('/gisnav/home_geopoint' , GeoPointStamped ),
46
+ ('/gisnav/terrain_altitude' , Altitude ),
47
+ ('/gisnav/terrain_geopoint' , GeoPointStamped ),
48
+ ('/gisnav/egm96_height' , Float32 )
49
+ ]
50
+ """List of GISNav internal topic names and types"""
51
+
52
+ CAMERA_TOPIC_NAMES_AND_TYPES = [
53
+ ('/camera/camera_info' , CameraInfo ),
54
+ ('/camera/image_raw' , Image )
55
+ ]
56
+ """List of camera topic names and types"""
57
+
33
58
AUTOPILOT_TOPIC_NAMES_AND_TYPES = [
34
59
('/mavros/global_position/global' , NavSatFix ),
35
- ('/mavros/pose_stamped /pose' , PoseStamped ),
60
+ ('/mavros/local_position /pose' , PoseStamped ),
36
61
('/mavros/home_position/home' , HomePosition ),
37
- ('/mavros/mount_control/comand ' , MountControl ),
62
+ ('/mavros/mount_control/command ' , MountControl ),
38
63
# ('/mavros/gps_input/gps_input', GPSINPUT) # not currently used - uses UDP socket instead of MAVROS topic
39
64
]
40
65
"""List of autopilot topic names and types"""
41
66
42
- TOPIC_NAMES_AND_TYPES = TestPX4Launch . GISNAV_TOPIC_NAMES_AND_TYPES + TestPX4Launch . CAMERA_TOPIC_NAMES_AND_TYPES \
67
+ TOPIC_NAMES_AND_TYPES = GISNAV_TOPIC_NAMES_AND_TYPES + CAMERA_TOPIC_NAMES_AND_TYPES \
43
68
+ AUTOPILOT_TOPIC_NAMES_AND_TYPES
44
69
"""List of all expected topic names and types"""
45
70
46
- # Override
47
71
NODE_NAMES_AND_NAMESPACES = {
48
72
('mock_gps_node' , '/' ),
49
73
('bbox_node' , '/' ),
@@ -53,6 +77,85 @@ class TestArduPilotLaunch(TestPX4Launch):
53
77
}
54
78
"""List of tuples of node names and namespaces"""
55
79
80
+ def __init__ (self , * args , ** kwargs ):
81
+ """Initializer override for declaring attributes used in the test case"""
82
+ super (TestArduPilotLaunch , self ).__init__ (* args , ** kwargs )
83
+ self .test_node = None
84
+
85
+ def _get_names_and_namespaces_within_timeout (self , timeout : int = 5 ) -> List [Tuple [str , str ]]:
86
+ """Returns node names and namespaces found within timeout period
87
+
88
+ :param timeout: Timeout in seconds
89
+ :return: List of tuples containing node name and type
90
+ :raise: :class:`.AssertionError` if names and namespaces are not returned within :param timeout_sec:
91
+ """
92
+ names_and_namespaces = None
93
+ end_time = time .time () + timeout
94
+ while time .time () < end_time : # and names_and_namespaces is None:
95
+ rclpy .spin_once (self .test_node , timeout_sec = 0.1 )
96
+ names_and_namespaces = self .test_node .get_node_names_and_namespaces ()
97
+
98
+ assert names_and_namespaces is not None , f'Could not get node names and namespaces within { timeout } s timeout.'
99
+ return names_and_namespaces
100
+
101
+ def _get_topic_names_and_types_within_timeout (self , timeout : int = 5 ) -> List [Tuple [str , str ]]:
102
+ """Returns topic names and types found within timeout period
103
+
104
+ :param timeout: Timeout in seconds
105
+ :return: List of tuples containing topic name and type
106
+ :raise: :class:`.AssertionError` if names and types are not returned within :param timeout_sec:
107
+ """
108
+ names_and_types = None
109
+ end_time = time .time () + timeout
110
+ while time .time () < end_time : # and names_and_types is None:
111
+ rclpy .spin_once (self .test_node , timeout_sec = 0.1 )
112
+ names_and_types = self .test_node .get_topic_names_and_types ()
113
+
114
+ assert names_and_types is not None , f'Could not get topic names and types within { timeout } s timeout.'
115
+ return names_and_types
116
+
117
+ @classmethod
118
+ def setUpClass (cls ):
119
+ """Initialize ROS context"""
120
+ rclpy .init ()
121
+
122
+ @classmethod
123
+ def tearDownClass (cls ):
124
+ """Shutdown ROS context"""
125
+ rclpy .shutdown ()
126
+
127
+ def setUp (self ) -> None :
128
+ """Creates a ROS node for testing"""
129
+ self .test_node = Node ('test_node' )
130
+
131
+ def tearDown (self ) -> None :
132
+ """Destroys the ROS node"""
133
+ self .test_node .destroy_node ()
134
+
135
+ def test_node_names_and_namespaces (self ):
136
+ """Tests that nodes are running with correct name and namespace"""
137
+ names , _ = tuple (zip (* self .NODE_NAMES_AND_NAMESPACES ))
138
+
139
+ found_names_and_namespaces = self ._get_names_and_namespaces_within_timeout (10 )
140
+ found_names , found_namespaces = tuple (zip (* found_names_and_namespaces ))
141
+
142
+ assert set (names ).issubset (found_names ), f'Not all nodes ({ names } ) were discovered ({ found_names } ).'
143
+ for name , namespace in self .NODE_NAMES_AND_NAMESPACES :
144
+ self .assertEqual (namespace , dict (found_names_and_namespaces ).get (name , None ))
145
+
146
+ def test_topic_names_and_types (self ):
147
+ """Tests that the nodes subscribe to and publish the expected ROS topics"""
148
+ names , _ = tuple (zip (* self .TOPIC_NAMES_AND_TYPES ))
149
+
150
+ found_names_and_types = self ._get_topic_names_and_types_within_timeout ()
151
+ found_names , found_types = tuple (zip (* found_names_and_types ))
152
+
153
+ assert set (names ).issubset (found_names ), f'Not all topics ({ names } ) were discovered ({ found_names } ).'
154
+ for name , type_ in self .TOPIC_NAMES_AND_TYPES :
155
+ types = dict (found_names_and_types ).get (name )
156
+ assert types is not None
157
+ self .assertEqual (type_ .__class__ .__name__ .replace ('Metaclass_' , '' ), types [0 ].split ('/' )[- 1 ])
158
+
56
159
57
160
if __name__ == '__main__' :
58
161
unittest .main ()
0 commit comments