|
24 | 24 | from nav2_msgs.action import DockRobot, NavigateToPose, UndockRobot
|
25 | 25 | import pytest
|
26 | 26 | import rclpy
|
27 |
| -from rclpy.action import ActionClient, ActionServer |
| 27 | +import rclpy.time |
| 28 | +import rclpy.duration |
| 29 | +from rclpy.action.client import ActionClient |
| 30 | +from rclpy.action.server import ActionServer |
28 | 31 | from sensor_msgs.msg import BatteryState
|
29 |
| -from tf2_ros import TransformBroadcaster |
| 32 | +from tf2_ros import TransformBroadcaster, Buffer, TransformListener |
30 | 33 |
|
31 | 34 |
|
32 | 35 | # This test can be run standalone with:
|
@@ -142,9 +145,40 @@ def nav_execute_callback(self, goal_handle):
|
142 | 145 | result.error_code = 0
|
143 | 146 | return result
|
144 | 147 |
|
| 148 | + def check_transform(self, timeout_sec, source_frame='odom', target_frame='base_link'): |
| 149 | + """Try to look up the transform we're publishing""" |
| 150 | + if self.transform_available: |
| 151 | + return True |
| 152 | + |
| 153 | + try: |
| 154 | + # Wait for transform to become available with timeout |
| 155 | + when = rclpy.time.Time() |
| 156 | + transform = self.tf_buffer.lookup_transform( |
| 157 | + target_frame, |
| 158 | + source_frame, |
| 159 | + when, |
| 160 | + timeout=rclpy.duration.Duration(seconds=timeout_sec) |
| 161 | + ) |
| 162 | + |
| 163 | + self.node.get_logger().info('Successfully found transform:') |
| 164 | + self.node.get_logger().info(f'Translation: [{transform.transform.translation.x}, ' |
| 165 | + f'{transform.transform.translation.y}, ' |
| 166 | + f'{transform.transform.translation.z}]') |
| 167 | + self.transform_available = True |
| 168 | + except Exception as e: |
| 169 | + self.node.get_logger().error(f'Error looking up transform: {str(e)}') |
| 170 | + self.transform_available = False |
| 171 | + |
| 172 | + return self.transform_available |
| 173 | + |
145 | 174 | def test_docking_server(self):
|
146 | 175 | # Publish TF for odometry
|
147 | 176 | self.tf_broadcaster = TransformBroadcaster(self.node)
|
| 177 | + # Create tf buffer and listener |
| 178 | + self.tf_buffer = Buffer() |
| 179 | + self.tf_listener = TransformListener(self.tf_buffer, self.node) |
| 180 | + self.transform_available = False |
| 181 | + |
148 | 182 |
|
149 | 183 | # Create a timer to run "control loop" at 20hz
|
150 | 184 | self.timer = self.node.create_timer(0.05, self.timer_callback)
|
@@ -175,20 +209,30 @@ def test_docking_server(self):
|
175 | 209 | 'navigate_to_pose',
|
176 | 210 | self.nav_execute_callback)
|
177 | 211 |
|
178 |
| - # Spin once so that TF is published |
179 |
| - rclpy.spin_once(self.node, timeout_sec=0.1) |
| 212 | + # Publish transform |
| 213 | + self.publish() |
| 214 | + |
| 215 | + # Run for 2 seconds to allow tf to propogate |
| 216 | + for i in range(20): |
| 217 | + rclpy.spin_once(self.node, timeout_sec=0.1) |
| 218 | + time.sleep(0.1) |
| 219 | + |
| 220 | + assert self.check_transform(timeout_sec=1.0), 'transform not available' |
180 | 221 |
|
181 | 222 | # Test docking action
|
182 | 223 | self.action_result = []
|
183 |
| - self.dock_action_client.wait_for_server(timeout_sec=5.0) |
| 224 | + assert self.dock_action_client.wait_for_server(timeout_sec=5.0), \ |
| 225 | + 'dock_robot service not available' |
| 226 | + |
184 | 227 | goal = DockRobot.Goal()
|
185 | 228 | goal.use_dock_id = True
|
186 | 229 | goal.dock_id = 'test_dock'
|
187 | 230 | future = self.dock_action_client.send_goal_async(
|
188 | 231 | goal, feedback_callback=self.action_feedback_callback)
|
189 | 232 | rclpy.spin_until_future_complete(self.node, future)
|
190 | 233 | self.goal_handle = future.result()
|
191 |
| - assert self.goal_handle.accepted |
| 234 | + assert self.goal_handle is not None, 'goal_handle should not be None' |
| 235 | + assert self.goal_handle.accepted, 'goal not accepted' |
192 | 236 | result_future_original = self.goal_handle.get_result_async()
|
193 | 237 |
|
194 | 238 | # Run for 2 seconds
|
|
0 commit comments