19
19
from action_msgs .msg import GoalStatus
20
20
from geometry_msgs .msg import TransformStamped , Twist , TwistStamped
21
21
from launch import LaunchDescription
22
+ # from launch.actions import SetEnvironmentVariable
22
23
from launch_ros .actions import Node
23
24
import launch_testing
25
+ import launch_testing .actions
26
+ import launch_testing .asserts
27
+ import launch_testing .markers
28
+ import launch_testing .util
24
29
from nav2_msgs .action import DockRobot , NavigateToPose , UndockRobot
25
30
import pytest
26
31
import rclpy
27
- from rclpy .action import ActionClient , ActionServer
32
+ from rclpy .action .client import ActionClient
33
+ from rclpy .action .server import ActionServer
28
34
from sensor_msgs .msg import BatteryState
29
35
from tf2_ros import TransformBroadcaster
30
36
31
37
32
38
# This test can be run standalone with:
33
39
# python3 -u -m pytest test_docking_server.py -s
34
40
41
+ # If python3-flaky is installed, you can run the test multiple times to
42
+ # try to identify flaky ness.
43
+ # python3 -u -m pytest --force-flaky --min-passes 3 --max-runs 5 -s -v test_docking_server.py
44
+
35
45
@pytest .mark .rostest
46
+ # @pytest.mark.flaky
47
+ # @pytest.mark.flaky(max_runs=5, min_passes=3)
36
48
def generate_test_description ():
37
49
38
50
return LaunchDescription ([
51
+ # SetEnvironmentVariable('RCUTILS_LOGGING_BUFFERED_STREAM', '1'),
52
+ # SetEnvironmentVariable('RCUTILS_LOGGING_USE_STDOUT', '1'),
39
53
Node (
40
54
package = 'opennav_docking' ,
41
55
executable = 'opennav_docking' ,
@@ -74,21 +88,26 @@ class TestDockingServer(unittest.TestCase):
74
88
@classmethod
75
89
def setUpClass (cls ):
76
90
rclpy .init ()
77
- # Latest odom -> base_link
78
- cls .x = 0.0
79
- cls .y = 0.0
80
- cls .theta = 0.0
81
- # Track charge state
82
- cls .is_charging = False
83
- # Latest command velocity
84
- cls .command = Twist ()
85
- cls .node = rclpy .create_node ('test_docking_server' )
86
91
87
92
@classmethod
88
93
def tearDownClass (cls ):
89
- cls .node .destroy_node ()
90
94
rclpy .shutdown ()
91
95
96
+ def setUp (self ):
97
+ # Create a ROS node for tests
98
+ # Latest odom -> base_link
99
+ self .x = 0.0
100
+ self .y = 0.0
101
+ self .theta = 0.0
102
+ # Track charge state
103
+ self .is_charging = False
104
+ # Latest command velocity
105
+ self .command = Twist ()
106
+ self .node = rclpy .create_node ('test_docking_server' )
107
+
108
+ def tearDown (self ):
109
+ self .node .destroy_node ()
110
+
92
111
def command_velocity_callback (self , msg ):
93
112
self .node .get_logger ().info ('Command: %f %f' % (msg .twist .linear .x , msg .twist .angular .z ))
94
113
self .command = msg .twist
@@ -175,24 +194,32 @@ def test_docking_server(self):
175
194
'navigate_to_pose' ,
176
195
self .nav_execute_callback )
177
196
178
- # Spin once so that TF is published
179
- rclpy .spin_once (self .node , timeout_sec = 0.1 )
197
+ # Publish transform
198
+ self .publish ()
199
+
200
+ # Run for 1 seconds to allow tf to propogate
201
+ for _ in range (10 ):
202
+ rclpy .spin_once (self .node , timeout_sec = 0.1 )
203
+ time .sleep (0.1 )
180
204
181
205
# Test docking action
182
206
self .action_result = []
183
- self .dock_action_client .wait_for_server (timeout_sec = 5.0 )
207
+ assert self .dock_action_client .wait_for_server (timeout_sec = 5.0 ), \
208
+ 'dock_robot service not available'
209
+
184
210
goal = DockRobot .Goal ()
185
211
goal .use_dock_id = True
186
212
goal .dock_id = 'test_dock'
187
213
future = self .dock_action_client .send_goal_async (
188
214
goal , feedback_callback = self .action_feedback_callback )
189
215
rclpy .spin_until_future_complete (self .node , future )
190
216
self .goal_handle = future .result ()
191
- assert self .goal_handle .accepted
217
+ assert self .goal_handle is not None , 'goal_handle should not be None'
218
+ assert self .goal_handle .accepted , 'goal_handle not accepted'
192
219
result_future_original = self .goal_handle .get_result_async ()
193
220
194
221
# Run for 2 seconds
195
- for i in range (20 ):
222
+ for _ in range (20 ):
196
223
rclpy .spin_once (self .node , timeout_sec = 0.1 )
197
224
time .sleep (0.1 )
198
225
@@ -201,7 +228,8 @@ def test_docking_server(self):
201
228
goal , feedback_callback = self .action_feedback_callback )
202
229
rclpy .spin_until_future_complete (self .node , future )
203
230
self .goal_handle = future .result ()
204
- assert self .goal_handle .accepted
231
+ assert self .goal_handle is not None , 'goal_handle should not be None'
232
+ assert self .goal_handle .accepted , 'goal_handle not accepted'
205
233
result_future = self .goal_handle .get_result_async ()
206
234
rclpy .spin_until_future_complete (self .node , result_future )
207
235
self .action_result .append (result_future .result ())
@@ -216,7 +244,7 @@ def test_docking_server(self):
216
244
self .node .get_logger ().info ('Goal preempted' )
217
245
218
246
# Run for 0.5 seconds
219
- for i in range (5 ):
247
+ for _ in range (5 ):
220
248
rclpy .spin_once (self .node , timeout_sec = 0.1 )
221
249
time .sleep (0.1 )
222
250
@@ -230,7 +258,8 @@ def test_docking_server(self):
230
258
goal , feedback_callback = self .action_feedback_callback )
231
259
rclpy .spin_until_future_complete (self .node , future )
232
260
self .goal_handle = future .result ()
233
- assert self .goal_handle .accepted
261
+ assert self .goal_handle is not None , 'goal_handle should not be None'
262
+ assert self .goal_handle .accepted , 'goal_handle not accepted'
234
263
result_future = self .goal_handle .get_result_async ()
235
264
rclpy .spin_until_future_complete (self .node , result_future )
236
265
self .action_result .append (result_future .result ())
@@ -247,10 +276,19 @@ def test_docking_server(self):
247
276
future = self .undock_action_client .send_goal_async (goal )
248
277
rclpy .spin_until_future_complete (self .node , future )
249
278
self .goal_handle = future .result ()
250
- assert self .goal_handle .accepted
279
+ assert self .goal_handle is not None , 'goal_handle should not be None'
280
+ assert self .goal_handle .accepted , 'goal_handle not accepted'
251
281
result_future = self .goal_handle .get_result_async ()
252
282
rclpy .spin_until_future_complete (self .node , result_future )
253
283
self .action_result .append (result_future .result ())
254
284
255
285
self .assertEqual (self .action_result [3 ].status , GoalStatus .STATUS_SUCCEEDED )
256
286
self .assertTrue (self .action_result [3 ].result .success )
287
+
288
+
289
+ @launch_testing .post_shutdown_test ()
290
+ class TestProcessOutput (unittest .TestCase ):
291
+
292
+ def test_exit_code (self , proc_info ):
293
+ # Check that all processes in the launch exit with code 0
294
+ launch_testing .asserts .assertExitCodes (proc_info )
0 commit comments