-
Notifications
You must be signed in to change notification settings - Fork 14
Feature/add lidar depth #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
bed7b79
ad1cc36
f92d0da
3b4dffe
f8b4978
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,119 @@ | ||
########################################################### | ||
# | ||
# align_pointcloud.py | ||
# | ||
# A class to align a point cloud with a camera & project onto the camera image frame | ||
# | ||
# Authors: Qingyuan Li | ||
# | ||
# January 27. 2025 | ||
# | ||
########################################################### | ||
|
||
|
||
from robotdatapy.data import PointCloudData, ImgData, PoseData | ||
import numpy as np | ||
import cv2 as cv | ||
from roman.utils import expandvars_recursive | ||
|
||
class AlignPointCloud(): | ||
pointcloud_data: PointCloudData | ||
img_data: ImgData | ||
camera_pose_data: PoseData | ||
T_camera_rangesens_static: np.ndarray | ||
|
||
def __init__(self, pointcloud_data: PointCloudData, img_data: ImgData, camera_pose_data: PoseData, tf_bag_path: str): | ||
""" | ||
Class for aligning a point cloud with a camera and projecting the point cloud onto the camera's image frame | ||
|
||
Args: | ||
pointcloud_data (PointCloudData): point cloud data class | ||
img_data (ImgData): image data class | ||
camera_pose_data (PoseData): pose data class | ||
tf_bag_path (str): path to bag with static tf data (needed to calculate static transform from | ||
camera to range sensor) | ||
""" | ||
self.pointcloud_data = pointcloud_data | ||
self.img_data = img_data | ||
self.camera_pose_data = camera_pose_data | ||
|
||
# calculate static transform from camera to range sensor | ||
self.pointcloud_frame = pointcloud_data.pointcloud(pointcloud_data.times[0]).header.frame_id | ||
self.camera_frame = img_data.img_header(img_data.times[0]).frame_id | ||
self.img_shape = img_data.img(img_data.times[0]).shape[:2] | ||
self.T_camera_rangesens_static = PoseData.any_static_tf_from_bag(expandvars_recursive(tf_bag_path), self.camera_frame, self.pointcloud_frame) | ||
|
||
def aligned_pointcloud(self, t: float): | ||
""" | ||
Return the closest point cloud at time t, aligned to the camera frame | ||
|
||
Args: | ||
t (float): time to get closest point cloud to | ||
|
||
Returns: | ||
np.ndarray: (n, 3) array containing 3D point cloud in the frame of the camera (Z forward, Y down) | ||
""" | ||
pcl = self.pointcloud_data.pointcloud(t) | ||
|
||
img_header = self.img_data.img_header(t) | ||
|
||
# get exact times of point cloud and image messages | ||
pointcloud_time = pcl.header.stamp.sec + 1e-9 * pcl.header.stamp.nanosec | ||
img_time = img_header.stamp.sec + 1e-9 * img_header.stamp.nanosec | ||
|
||
# calculate dynamic transform between robot at time of image and robot at time of pointcloud | ||
T_W_camera_pointcloud_time = self.camera_pose_data.T_WB(pointcloud_time) | ||
T_W_camera_image_time = self.camera_pose_data.T_WB(img_time) | ||
T_W_rangesens_pointcloud_time = T_W_camera_pointcloud_time @ self.T_camera_rangesens_static | ||
T_W_rangesens_image_time = T_W_camera_image_time @ self.T_camera_rangesens_static | ||
|
||
T_img_cloud_dynamic = np.linalg.inv(T_W_rangesens_image_time) @ T_W_rangesens_pointcloud_time | ||
|
||
# compose static and dynamic transforms to get approximately exact transform | ||
# between camera at time of image and range sensor at time of point cloud | ||
T_camera_rangesens = self.T_camera_rangesens_static @ T_img_cloud_dynamic | ||
|
||
# transform points into image frame | ||
points = pcl.get_xyz() | ||
points_h = np.hstack((points, np.ones((points.shape[0], 1)))) | ||
points_h_image_frame = points_h @ T_camera_rangesens.T | ||
points_camera_frame = points_h_image_frame[:, :3] | ||
|
||
# mask for points in front of camera (positive Z in camera frame) | ||
in_front_mask = points_camera_frame[:, 2] >= 0 | ||
points_camera_frame_filtered = points_camera_frame[in_front_mask] | ||
|
||
return points_camera_frame_filtered | ||
|
||
def projected_pointcloud(self, points_camera_frame): | ||
""" | ||
Projects a 3D point cloud in the camera frame (from aligned_pointcloud) to | ||
the 2D image frame | ||
|
||
Args: | ||
points_camera_frame (np.ndarray): (n, 3) array containing 3D point cloud in the frame of the camera (Z forward, Y down) | ||
|
||
Returns: | ||
np.ndarray: (n, 2) array containing 2D projected points in (u, v) coordinates, order unchanged | ||
""" | ||
# project point cloud onto 2D image | ||
points_camera_frame_cv = points_camera_frame.reshape(-1, 1, 3) | ||
points_2d_cv, _ = cv.projectPoints(points_camera_frame_cv, np.zeros(3), np.zeros(3), self.img_data.K, self.img_data.D) | ||
points_2d = points_2d_cv.reshape((-1, 2)) | ||
|
||
return points_2d | ||
|
||
def filter_pointcloud_and_projection(self, points_camera_frame, points_2d): | ||
""" | ||
Filters array of (u, v) coordinates in image frame and associated 3D point cloud by checking | ||
that it is inside the associated rgb image frame bounds and casting to int | ||
|
||
Args: | ||
points_camera_frame (np.ndarray): (n, 3) array containing 3D point cloud in the frame of the camera (Z forward, Y down) | ||
points_2d (np.ndarray): (n, 2) array containing 2D projected points in (u, v) coordinates, in same order | ||
""" | ||
points_2d = np.round(points_2d).astype(int) | ||
inside_frame = (points_2d[:, 0] >= 0) & (points_2d[:, 0] < self.img_shape[1]) & (points_2d[:, 1] >= 0) & (points_2d[:, 1] < self.img_shape[0]) | ||
points_camera_frame = points_camera_frame[inside_frame] | ||
points_2d = points_2d[inside_frame] | ||
return points_camera_frame, points_2d |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -5,7 +5,7 @@ | |
# A Python wrapper for sending RGBD images to FastSAM and using segmentation | ||
# masks to create object observations. | ||
# | ||
# Authors: Jouko Kinnari, Mason Peterson, Lucas Jia, Annika Thomas | ||
# Authors: Jouko Kinnari, Mason Peterson, Lucas Jia, Annika Thomas, Qingyuan Li | ||
# | ||
# Dec. 21, 2024 | ||
# | ||
|
@@ -74,8 +74,9 @@ def __init__(self, | |
device='cuda', | ||
mask_downsample_factor=1, | ||
rotate_img=None, | ||
use_pointcloud=False | ||
): | ||
"""Wrapper for running FastSAM on images (especially RGB/depth pairs) | ||
"""Wrapper for running FastSAM on images (RGB/depth data) | ||
|
||
Args: | ||
weights (str): Path to FastSAM weights. | ||
|
@@ -87,6 +88,7 @@ def __init__(self, | |
Defaults to 1. | ||
rotate_img (_type_, optional): 'CW', 'CCW', or '180' for rotating image before | ||
feeding into FastSAM. Defaults to None. | ||
use_pointcloud (bool, optional): True if depth data source is pointcloud | ||
""" | ||
# parameters | ||
self.weights = weights | ||
|
@@ -96,6 +98,7 @@ def __init__(self, | |
self.imgsz = imgsz | ||
self.mask_downsample_factor = mask_downsample_factor | ||
self.rotate_img = rotate_img | ||
self.use_pointcloud = use_pointcloud | ||
|
||
# member variables | ||
self.observations = [] | ||
|
@@ -111,13 +114,14 @@ def __init__(self, | |
or self.rotate_img == '180', "Invalid rotate_img option." | ||
|
||
@classmethod | ||
def from_params(cls, params: FastSAMParams, depth_cam_params: CameraParams): | ||
def from_params(cls, params: FastSAMParams, depth_cam_params: CameraParams, use_pointcloud: bool): | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we make |
||
fastsam = cls( | ||
weights=expandvars_recursive(params.weights_path), | ||
imgsz=params.imgsz, | ||
device=params.device, | ||
mask_downsample_factor=params.mask_downsample_factor, | ||
rotate_img=params.rotate_img | ||
rotate_img=params.rotate_img, | ||
use_pointcloud=use_pointcloud | ||
) | ||
fastsam.setup_rgbd_params( | ||
depth_cam_params=depth_cam_params, | ||
|
@@ -211,14 +215,15 @@ def setup_rgbd_params( | |
self.max_depth = max_depth | ||
self.within_depth_frac = within_depth_frac | ||
self.depth_scale = depth_scale | ||
self.depth_cam_intrinsics = o3d.camera.PinholeCameraIntrinsic( | ||
width=int(depth_cam_params.width), | ||
height=int(depth_cam_params.height), | ||
fx=depth_cam_params.fx, | ||
fy=depth_cam_params.fy, | ||
cx=depth_cam_params.cx, | ||
cy=depth_cam_params.cy, | ||
) | ||
if not self.use_pointcloud: | ||
self.depth_cam_intrinsics = o3d.camera.PinholeCameraIntrinsic( | ||
width=int(depth_cam_params.width), | ||
height=int(depth_cam_params.height), | ||
fx=depth_cam_params.fx, | ||
fy=depth_cam_params.fy, | ||
cx=depth_cam_params.cx, | ||
cy=depth_cam_params.cy, | ||
) | ||
self.voxel_size = voxel_size | ||
self.pcd_stride = pcd_stride | ||
if erosion_size > 0: | ||
|
@@ -230,7 +235,7 @@ def setup_rgbd_params( | |
self.erosion_element = None | ||
self.plane_filter_params = plane_filter_params | ||
|
||
def run(self, t, pose, img, img_depth=None, plot=False): | ||
def run(self, t, pose, img, depth_data=None, plot=False): | ||
""" | ||
Takes and image and returns filtered FastSAM masks as Observations. | ||
|
||
|
@@ -247,6 +252,9 @@ def run(self, t, pose, img, img_depth=None, plot=False): | |
img_orig = img | ||
img = self.apply_rotation(img) | ||
|
||
if self.use_pointcloud: | ||
pcl, pcl_proj = depth_data | ||
|
||
if self.run_yolo: | ||
ignore_mask, keep_mask = self._create_mask(img) | ||
else: | ||
|
@@ -262,40 +270,92 @@ def run(self, t, pose, img, img_depth=None, plot=False): | |
|
||
# Extract point cloud of object from RGBD | ||
ptcld = None | ||
if img_depth is not None: | ||
depth_obj = copy.deepcopy(img_depth) | ||
if self.erosion_element is not None: | ||
eroded_mask = cv.erode(mask, self.erosion_element) | ||
depth_obj[eroded_mask==0] = 0 | ||
if depth_data is not None: | ||
if self.use_pointcloud: | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Could we clean up these comments a bit for the PR? |
||
# if False: # visualizations for debugging | ||
# # point_cloud = o3d.geometry.PointCloud() | ||
# # point_cloud.points = o3d.utility.Vector3dVector(pcl) | ||
# # point_cloud.paint_uniform_color([1, 0, 0]) | ||
# # coordinate_frame = o3d.geometry.TriangleMesh.create_coordinate_frame(size=10.0) | ||
|
||
# # o3d.visualization.draw_geometries([point_cloud, coordinate_frame]) | ||
|
||
# import matplotlib.pyplot as plt | ||
|
||
# img_shape = img.shape[:2] | ||
# depth_image = np.zeros(img_shape, dtype=np.float32) | ||
# depths = pcl[:, 2] | ||
# MAX_DEPTH=15 | ||
|
||
# for i in range(pcl_proj.shape[0]): | ||
# u, v = pcl_proj[i] | ||
# u, v = int(u), int(v) | ||
# depth = depths[i] | ||
# if 0 <= u < img_shape[1] and 0 <= v < img_shape[0] and depth <= MAX_DEPTH: | ||
# depth = depths[i] | ||
# depth_image[v, u] = depth | ||
|
||
# depth_normalized = cv.normalize(depth_image, None, 0, 1, cv.NORM_MINMAX) | ||
# depth_colored = cv.applyColorMap(((1 - depth_normalized) * 255).astype(np.uint8), cv.COLORMAP_JET) | ||
# rgb_image_rgb = cv.cvtColor(img, cv.COLOR_BGR2RGB) | ||
|
||
# overlay = depth_colored | ||
# overlay[depth_image == 0] = rgb_image_rgb[depth_image == 0] | ||
# plt.imshow(overlay) | ||
# plt.show() | ||
|
||
# # import ipdb | ||
# # ipdb.set_trace() | ||
|
||
# get 3D points that project within the mask | ||
inside_mask = mask[pcl_proj[:, 1], pcl_proj[:, 0]] == 1 | ||
inside_mask_points = pcl[inside_mask] | ||
pre_truncate_len = len(inside_mask_points) | ||
ptcld_test = inside_mask_points[inside_mask_points[:, 2] < self.max_depth] | ||
|
||
if len(ptcld_test) < self.within_depth_frac*pre_truncate_len: | ||
continue | ||
|
||
pcd = o3d.geometry.PointCloud() | ||
pcd.points = o3d.utility.Vector3dVector(inside_mask_points) | ||
|
||
else: | ||
depth_obj[mask==0] = 0 | ||
logger.debug(f"img_depth type {img_depth.dtype}, shape={img_depth.shape}") | ||
|
||
# Extract point cloud without truncation to heuristically check if enough of the object | ||
# is within the max depth | ||
pcd_test = o3d.geometry.PointCloud.create_from_depth_image( | ||
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)), | ||
self.depth_cam_intrinsics, | ||
depth_scale=self.depth_scale, | ||
# depth_trunc=self.max_depth, | ||
stride=self.pcd_stride, | ||
project_valid_depth_only=True | ||
) | ||
ptcld_test = np.asarray(pcd_test.points) | ||
pre_truncate_len = len(ptcld_test) | ||
ptcld_test = ptcld_test[ptcld_test[:,2] < self.max_depth] | ||
# require some fraction of the points to be within the max depth | ||
if len(ptcld_test) < self.within_depth_frac*pre_truncate_len: | ||
continue | ||
|
||
pcd = o3d.geometry.PointCloud.create_from_depth_image( | ||
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)), | ||
self.depth_cam_intrinsics, | ||
depth_scale=self.depth_scale, | ||
depth_trunc=self.max_depth, | ||
stride=self.pcd_stride, | ||
project_valid_depth_only=True | ||
) | ||
depth_obj = copy.deepcopy(depth_data) | ||
if self.erosion_element is not None: | ||
eroded_mask = cv.erode(mask, self.erosion_element) | ||
depth_obj[eroded_mask==0] = 0 | ||
else: | ||
depth_obj[mask==0] = 0 | ||
logger.debug(f"img_depth type {depth_data.dtype}, shape={depth_data.shape}") | ||
|
||
# Extract point cloud without truncation to heuristically check if enough of the object | ||
# is within the max depth | ||
pcd_test = o3d.geometry.PointCloud.create_from_depth_image( | ||
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)), | ||
self.depth_cam_intrinsics, | ||
depth_scale=self.depth_scale, | ||
# depth_trunc=self.max_depth, | ||
stride=self.pcd_stride, | ||
project_valid_depth_only=True | ||
) | ||
ptcld_test = np.asarray(pcd_test.points) | ||
pre_truncate_len = len(ptcld_test) | ||
ptcld_test = ptcld_test[ptcld_test[:,2] < self.max_depth] | ||
# require some fraction of the points to be within the max depth | ||
if len(ptcld_test) < self.within_depth_frac*pre_truncate_len: | ||
continue | ||
|
||
pcd = o3d.geometry.PointCloud.create_from_depth_image( | ||
o3d.geometry.Image(np.ascontiguousarray(depth_obj).astype(np.uint16)), | ||
self.depth_cam_intrinsics, | ||
depth_scale=self.depth_scale, | ||
depth_trunc=self.max_depth, | ||
stride=self.pcd_stride, | ||
project_valid_depth_only=True | ||
) | ||
|
||
# shared for depth & rangesens, once PointCloud object is created | ||
|
||
pcd.remove_non_finite_points() | ||
pcd_sampled = pcd.voxel_down_sample(voxel_size=self.voxel_size) | ||
if not pcd_sampled.is_empty(): | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think using the header attribute constrains us to only using ros data. The robot data should all be essentially a list of times and datapoints at those times, so you should be able to get the time with:
And that way if we want to use pointclouds from other source we aren't accessing a non-existant header