|
| 1 | +#!/usr/bin/env python3 |
| 2 | +# -*- coding: utf-8 -*- |
| 3 | +""" |
| 4 | +This module defines the ComputeMouthFrame behavior, which takes in the PointStamped |
| 5 | +containing the (x, y, z) position of the mouth in the camera frame, and returns the |
| 6 | +pose of the mouth in the base frame. |
| 7 | +""" |
| 8 | + |
| 9 | +# Standard imports |
| 10 | +from typing import Optional, Union |
| 11 | + |
| 12 | +# Third-party imports |
| 13 | +from geometry_msgs.msg import PointStamped, PoseStamped, Vector3 |
| 14 | +from overrides import override |
| 15 | +import py_trees |
| 16 | +import rclpy |
| 17 | +import tf2_ros |
| 18 | + |
| 19 | +# Local imports |
| 20 | +from ada_feeding.helpers import ( |
| 21 | + BlackboardKey, |
| 22 | + quat_between_vectors, |
| 23 | + get_tf_object, |
| 24 | +) |
| 25 | +from ada_feeding.behaviors import BlackboardBehavior |
| 26 | + |
| 27 | + |
| 28 | +class ComputeMouthFrame(BlackboardBehavior): |
| 29 | + """ |
| 30 | + Compute the mouth PoseStamped in the base frame, where the position is the output |
| 31 | + of face detection and the orientation is facing the forkTip. |
| 32 | + """ |
| 33 | + |
| 34 | + # pylint: disable=arguments-differ |
| 35 | + # We *intentionally* violate Liskov Substitution Princple |
| 36 | + # in that blackboard config (inputs + outputs) are not |
| 37 | + # meant to be called in a generic setting. |
| 38 | + |
| 39 | + # pylint: disable=too-many-arguments |
| 40 | + # These are effectively config definitions |
| 41 | + # They require a lot of arguments. |
| 42 | + |
| 43 | + def blackboard_inputs( |
| 44 | + self, |
| 45 | + detected_mouth_center: Union[BlackboardKey, PointStamped], |
| 46 | + timestamp: Union[BlackboardKey, rclpy.time.Time] = rclpy.time.Time(), |
| 47 | + world_frame: Union[BlackboardKey, str] = "root", # +z will match this frame |
| 48 | + frame_to_orient_towards: Union[ |
| 49 | + BlackboardKey, str |
| 50 | + ] = "forkTip", # +x will point towards this frame |
| 51 | + ) -> None: |
| 52 | + """ |
| 53 | + Blackboard Inputs |
| 54 | +
|
| 55 | + Parameters |
| 56 | + ---------- |
| 57 | + detected_mouth_center : Union[BlackboardKey, PointStamped] |
| 58 | + The detected mouth center in the camera frame. |
| 59 | + timestamp : Union[BlackboardKey, rclpy.time.Time] |
| 60 | + The timestamp of the detected mouth center (default 0 for latest). |
| 61 | + world_frame : Union[BlackboardKey, str] |
| 62 | + The target frame to transform the mouth center to. |
| 63 | + frame_to_orient_towards : Union[BlackboardKey, str] |
| 64 | + The frame that the mouth should orient towards. |
| 65 | + """ |
| 66 | + # pylint: disable=unused-argument, duplicate-code |
| 67 | + # Arguments are handled generically in base class. |
| 68 | + super().blackboard_inputs( |
| 69 | + **{key: value for key, value in locals().items() if key != "self"} |
| 70 | + ) |
| 71 | + |
| 72 | + def blackboard_outputs( |
| 73 | + self, mouth_pose: Optional[BlackboardKey] # PoseStamped |
| 74 | + ) -> None: |
| 75 | + """ |
| 76 | + Blackboard Outputs |
| 77 | +
|
| 78 | + Parameters |
| 79 | + ---------- |
| 80 | + mouth_pose : Optional[BlackboardKey] |
| 81 | + The PoseStamped of the mouth in the base frame. |
| 82 | + """ |
| 83 | + # pylint: disable=unused-argument, duplicate-code |
| 84 | + # Arguments are handled generically in base class. |
| 85 | + super().blackboard_outputs( |
| 86 | + **{key: value for key, value in locals().items() if key != "self"} |
| 87 | + ) |
| 88 | + |
| 89 | + @override |
| 90 | + def setup(self, **kwargs): |
| 91 | + # Docstring copied from @override |
| 92 | + |
| 93 | + # pylint: disable=attribute-defined-outside-init |
| 94 | + # It is okay for attributes in behaviors to be |
| 95 | + # defined in the setup / initialise functions. |
| 96 | + |
| 97 | + # Get Node from Kwargs |
| 98 | + self.node = kwargs["node"] |
| 99 | + |
| 100 | + # Get TF Listener from blackboard |
| 101 | + self.tf_buffer, _, self.tf_lock = get_tf_object(self.blackboard, self.node) |
| 102 | + |
| 103 | + @override |
| 104 | + def update(self) -> py_trees.common.Status: |
| 105 | + # Docstring copied from @override |
| 106 | + |
| 107 | + # Validate inputs |
| 108 | + if not self.blackboard_exists( |
| 109 | + [ |
| 110 | + "detected_mouth_center", |
| 111 | + "timestamp", |
| 112 | + "world_frame", |
| 113 | + "frame_to_orient_towards", |
| 114 | + ] |
| 115 | + ): |
| 116 | + self.logger.error( |
| 117 | + "Missing detected_mouth_center, timestamp, world_frame, or frame_to_orient_towards." |
| 118 | + ) |
| 119 | + return py_trees.common.Status.FAILURE |
| 120 | + detected_mouth_center = self.blackboard_get("detected_mouth_center") |
| 121 | + timestamp = self.blackboard_get("timestamp") |
| 122 | + world_frame = self.blackboard_get("world_frame") |
| 123 | + frame_to_orient_towards = self.blackboard_get("frame_to_orient_towards") |
| 124 | + |
| 125 | + # Lock TF Buffer |
| 126 | + if self.tf_lock.locked(): |
| 127 | + # Not yet, wait for it |
| 128 | + # Use a Timeout decorator to determine failure. |
| 129 | + return py_trees.common.Status.RUNNING |
| 130 | + with self.tf_lock: |
| 131 | + # Transform detected_mouth_center to world_frame |
| 132 | + if not self.tf_buffer.can_transform( |
| 133 | + world_frame, |
| 134 | + detected_mouth_center.header.frame_id, |
| 135 | + timestamp, |
| 136 | + ): |
| 137 | + # Not yet, wait for it |
| 138 | + # Use a Timeout decorator to determine failure. |
| 139 | + self.logger.warning("ComputeMouthFrame waiting on world/camera TF...") |
| 140 | + return py_trees.common.Status.RUNNING |
| 141 | + camera_transform = self.tf_buffer.lookup_transform( |
| 142 | + world_frame, |
| 143 | + detected_mouth_center.header.frame_id, |
| 144 | + timestamp, |
| 145 | + ) |
| 146 | + mouth_point = tf2_ros.TransformRegistration().get(PointStamped)( |
| 147 | + detected_mouth_center, camera_transform |
| 148 | + ) |
| 149 | + self.logger.info(f"Computed mouth point: {mouth_point.point}") |
| 150 | + |
| 151 | + # Transform frame_to_orient_towards to world_frame |
| 152 | + if not self.tf_buffer.can_transform( |
| 153 | + world_frame, |
| 154 | + frame_to_orient_towards, |
| 155 | + timestamp, |
| 156 | + ): |
| 157 | + # Not yet, wait for it |
| 158 | + # Use a Timeout decorator to determine failure. |
| 159 | + self.logger.warning("ComputeMouthFrame waiting on world/forkTip TF...") |
| 160 | + return py_trees.common.Status.RUNNING |
| 161 | + orientation_target_transform = self.tf_buffer.lookup_transform( |
| 162 | + world_frame, |
| 163 | + frame_to_orient_towards, |
| 164 | + timestamp, |
| 165 | + ) |
| 166 | + self.logger.info( |
| 167 | + f"Computed orientation target transform: {orientation_target_transform.transform}" |
| 168 | + ) |
| 169 | + |
| 170 | + # Get the yaw of the face frame |
| 171 | + x_unit = Vector3(x=1.0, y=0.0, z=0.0) |
| 172 | + x_pos = Vector3( |
| 173 | + x=orientation_target_transform.transform.translation.x |
| 174 | + - mouth_point.point.x, |
| 175 | + y=orientation_target_transform.transform.translation.y |
| 176 | + - mouth_point.point.y, |
| 177 | + z=0.0, |
| 178 | + ) |
| 179 | + self.logger.info(f"Computed x_pos: {x_pos}") |
| 180 | + quat = quat_between_vectors(x_unit, x_pos) |
| 181 | + self.logger.info(f"Computed orientation: {quat}") |
| 182 | + |
| 183 | + # Create return object |
| 184 | + mouth_pose = PoseStamped() |
| 185 | + mouth_pose.header.frame_id = mouth_point.header.frame_id |
| 186 | + mouth_pose.header.stamp = mouth_point.header.stamp |
| 187 | + mouth_pose.pose.position = mouth_point.point |
| 188 | + mouth_pose.pose.orientation = quat |
| 189 | + |
| 190 | + # Write to blackboard outputs |
| 191 | + self.blackboard_set("mouth_pose", mouth_pose) |
| 192 | + |
| 193 | + return py_trees.common.Status.SUCCESS |
0 commit comments