Swarm-SLAM  1.0.0
C-SLAM Framework
icp_utils.py
Go to the documentation of this file.
1 import cslam.utils.point_cloud2 as pc2_utils
2 from geometry_msgs.msg import Transform
3 from scipy.spatial.transform import Rotation as R
4 from std_msgs.msg import Header
5 from sensor_msgs.msg import PointCloud2, PointField
6 
7 import numpy as np
8 import teaserpp_python
9 import open3d
10 from scipy.spatial import cKDTree
11 import rclpy
12 
13 FIELDS_XYZ = [
14  PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
15  PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
16  PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
17 ]
18 
19 # Partially adapted from https://github.com/MIT-SPARK/TEASER-plusplus/tree/master/examples/teaser_python_fpfh_icp
20 
21 
22 def pcd2xyz(pcd):
23  return np.asarray(pcd.points).T
24 
25 
26 def extract_fpfh(pcd, voxel_size):
27  radius_normal = voxel_size * 2
28  pcd.estimate_normals(
29  open3d.geometry.KDTreeSearchParamHybrid(radius=radius_normal,
30  max_nn=30))
31 
32  radius_feature = voxel_size * 5
33  fpfh = open3d.pipelines.registration.compute_fpfh_feature(
34  pcd,
35  open3d.geometry.KDTreeSearchParamHybrid(radius=radius_feature,
36  max_nn=100))
37  return np.array(fpfh.data).T
38 
39 
40 def find_knn_cpu(feat0, feat1, knn=1, return_distance=False):
41  feat1tree = cKDTree(feat1)
42  dists, nn_inds = feat1tree.query(feat0, k=knn, workers=-1)
43  if return_distance:
44  return nn_inds, dists
45  else:
46  return nn_inds
47 
48 
49 def find_correspondences(feats0, feats1, mutual_filter=True):
50  nns01 = find_knn_cpu(feats0, feats1, knn=1, return_distance=False)
51  corres01_idx0 = np.arange(len(nns01))
52  corres01_idx1 = nns01
53 
54  if not mutual_filter:
55  return corres01_idx0, corres01_idx1
56 
57  nns10 = find_knn_cpu(feats1, feats0, knn=1, return_distance=False)
58  corres10_idx1 = np.arange(len(nns10))
59  corres10_idx0 = nns10
60 
61  mutual_filter = (corres10_idx0[corres01_idx1] == corres01_idx0)
62  corres_idx0 = corres01_idx0[mutual_filter]
63  corres_idx1 = corres01_idx1[mutual_filter]
64 
65  return corres_idx0, corres_idx1
66 
67 
68 def get_teaser_solver(noise_bound):
69  solver_params = teaserpp_python.RobustRegistrationSolver.Params()
70  solver_params.cbar2 = 1.0
71  solver_params.noise_bound = noise_bound
72  solver_params.estimate_scaling = False
73  solver_params.inlier_selection_mode = \
74  teaserpp_python.RobustRegistrationSolver.INLIER_SELECTION_MODE.PMC_EXACT
75  solver_params.rotation_tim_graph = \
76  teaserpp_python.RobustRegistrationSolver.INLIER_GRAPH_FORMULATION.CHAIN
77  solver_params.rotation_estimation_algorithm = \
78  teaserpp_python.RobustRegistrationSolver.ROTATION_ESTIMATION_ALGORITHM.GNC_TLS
79  solver_params.rotation_gnc_factor = 1.4
80  solver_params.rotation_max_iterations = 10000
81  solver_params.rotation_cost_threshold = 1e-16
82  solver = teaserpp_python.RobustRegistrationSolver(solver_params)
83  return solver
84 
85 
86 def Rt2T(R, t):
87  T = np.identity(4)
88  T[:3, :3] = R
89  T[:3, 3] = t
90  return T
91 
92 
93 def downsample(points, voxel_size):
94  open3d_cloud = open3d.geometry.PointCloud()
95  open3d_cloud.points = open3d.utility.Vector3dVector(points)
96  return open3d_cloud.voxel_down_sample(voxel_size=voxel_size)
97 
98 
99 def solve_teaser(src, dst, voxel_size, min_inliers):
100  src_feats = extract_fpfh(src, voxel_size)
101  dst_feats = extract_fpfh(dst, voxel_size)
102 
103  corrs_src, corrs_dst = find_correspondences(src_feats,
104  dst_feats,
105  mutual_filter=True)
106 
107  src_xyz = pcd2xyz(src) # np array of size 3 by N
108  dst_xyz = pcd2xyz(dst) # np array of size 3 by M
109  src_corr = src_xyz[:, corrs_src] # np array of size 3 by num_corrs
110  dst_corr = dst_xyz[:, corrs_dst] # np array of size 3 by num_corrs
111 
112  solver = get_teaser_solver(voxel_size)
113  solver.solve(src_corr, dst_corr)
114 
115  solution = solver.getSolution()
116 
117  valid = len(solver.getInlierMaxClique()) > min_inliers
118 
119  if valid:
120  # ICP refinement
121  T_teaser = Rt2T(solution.rotation, solution.translation)
122  icp_sol = open3d.pipelines.registration.registration_icp(
123  src, dst, voxel_size, T_teaser,
124  open3d.pipelines.registration.TransformationEstimationPointToPoint(
125  ),
126  open3d.pipelines.registration.ICPConvergenceCriteria(
127  max_iteration=100))
128  T_icp = icp_sol.transformation
129  solution.translation = T_icp[:3, 3]
130  solution.rotation = T_icp[:3, :3]
131  else:
132  rclpy.logging.get_logger('cslam').info(
133  'Failed to compute loop closure. Number of inliers ( {} / {} )'.
134  format(len(solver.getInlierMaxClique()), min_inliers))
135  return valid, solution.translation, solution.rotation
136 
137 
138 def to_transform_msg(translation, rotation):
139  T = Transform()
140  T.translation.x = translation[0]
141  T.translation.y = translation[1]
142  T.translation.z = translation[2]
143  rotation_matrix = R.from_matrix(np.array(rotation))
144  q = rotation_matrix.as_quat()
145  T.rotation.x = q[0]
146  T.rotation.y = q[1]
147  T.rotation.z = q[2]
148  T.rotation.w = q[3]
149  return T
150 
151 
152 def open3d_to_ros(open3d_cloud):
153  header = Header()
154  fields = FIELDS_XYZ
155  points = np.asarray(open3d_cloud.points)
156  return pc2_utils.create_cloud(header, fields, points)
157 
158 
159 def ros_to_open3d(msg):
160  points = ros_pointcloud_to_points(msg)
161  open3d_cloud = open3d.geometry.PointCloud()
162  open3d_cloud.points = open3d.utility.Vector3dVector(points)
163  return open3d_cloud
164 
165 
167  return pc2_utils.read_points_numpy_filtered(pc_msg)[:, :3]
168 
169 
170 def downsample_ros_pointcloud(pc_msg, voxel_size):
171  points = ros_pointcloud_to_points(pc_msg)
172  return downsample(points, voxel_size)
173 
174 def compute_transform(src, dst, voxel_size, min_inliers):
175  """Computes a 3D transform between 2 point clouds using TEASER++.
176 
177  Be careful: TEASER++ computes the transform from dst to src.
178 
179  Args:
180  src (Open3D point cloud): pointcloud from
181  dst (Open3D point cloud): pointcloud to
182  voxel_size (int): Voxel size
183  min_inliers (int): Minimum number of inlier points correspondance to consider the registration a success
184 
185  Returns:
186  (Transform, bool): transform and success flag
187  """
188  valid, translation, rotation = solve_teaser(src, dst, voxel_size,
189  min_inliers)
190  success = valid
191  transform = to_transform_msg(translation, rotation)
192  return transform, success
def find_correspondences(feats0, feats1, mutual_filter=True)
Definition: icp_utils.py:49
def ros_to_open3d(msg)
Definition: icp_utils.py:159
def pcd2xyz(pcd)
Definition: icp_utils.py:22
def ros_pointcloud_to_points(pc_msg)
Definition: icp_utils.py:166
def downsample(points, voxel_size)
Definition: icp_utils.py:93
def downsample_ros_pointcloud(pc_msg, voxel_size)
Definition: icp_utils.py:170
def find_knn_cpu(feat0, feat1, knn=1, return_distance=False)
Definition: icp_utils.py:40
def open3d_to_ros(open3d_cloud)
Definition: icp_utils.py:152
def extract_fpfh(pcd, voxel_size)
Definition: icp_utils.py:26
def get_teaser_solver(noise_bound)
Definition: icp_utils.py:68
def compute_transform(src, dst, voxel_size, min_inliers)
Definition: icp_utils.py:174
def to_transform_msg(translation, rotation)
Definition: icp_utils.py:138
def solve_teaser(src, dst, voxel_size, min_inliers)
Definition: icp_utils.py:99
def Rt2T(R, t)
Definition: icp_utils.py:86