Swarm-SLAM  1.0.0
C-SLAM Framework
point_cloud2.py
Go to the documentation of this file.
1 # Copyright 2008 Willow Garage, Inc.
2 #
3 # Redistribution and use in source and binary forms, with or without
4 # modification, are permitted provided that the following conditions are met:
5 #
6 # * Redistributions of source code must retain the above copyright
7 # notice, this list of conditions and the following disclaimer.
8 #
9 # * Redistributions in binary form must reproduce the above copyright
10 # notice, this list of conditions and the following disclaimer in the
11 # documentation and/or other materials provided with the distribution.
12 #
13 # * Neither the name of the Willow Garage, Inc. nor the names of its
14 # contributors may be used to endorse or promote products derived from
15 # this software without specific prior written permission.
16 #
17 # THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
18 # AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
19 # IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
20 # ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
21 # LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
22 # CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
23 # SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
24 # INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
25 # CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
26 # ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
27 # POSSIBILITY OF SUCH DAMAGE.
28 
29 
30 # Serialization of sensor_msgs.PointCloud2 messages.
31 
32 import array
33 from collections import namedtuple
34 import sys
35 from typing import Iterable, List, NamedTuple, Optional
36 
37 import numpy as np
38 try:
39  from numpy.lib.recfunctions import (structured_to_unstructured, unstructured_to_structured)
40 except ImportError:
41  from sensor_msgs_py.numpy_compat import (structured_to_unstructured,
42  unstructured_to_structured)
43 
44 from sensor_msgs.msg import PointCloud2, PointField
45 from std_msgs.msg import Header
46 
47 
48 _DATATYPES = {}
49 _DATATYPES[PointField.INT8] = np.dtype(np.int8)
50 _DATATYPES[PointField.UINT8] = np.dtype(np.uint8)
51 _DATATYPES[PointField.INT16] = np.dtype(np.int16)
52 _DATATYPES[PointField.UINT16] = np.dtype(np.uint16)
53 _DATATYPES[PointField.INT32] = np.dtype(np.int32)
54 _DATATYPES[PointField.UINT32] = np.dtype(np.uint32)
55 _DATATYPES[PointField.FLOAT32] = np.dtype(np.float32)
56 _DATATYPES[PointField.FLOAT64] = np.dtype(np.float64)
57 
58 DUMMY_FIELD_PREFIX = 'unnamed_field'
59 
60 
62  cloud: PointCloud2,
63  field_names: Optional[List[str]] = None,
64  skip_nans: bool = False,
65  uvs: Optional[Iterable] = None,
66  reshape_organized_cloud: bool = False) -> np.ndarray:
67  """
68  Read points from a sensor_msgs.PointCloud2 message.
69 
70  :param cloud: The point cloud to read from sensor_msgs.PointCloud2.
71  :param field_names: The names of fields to read. If None, read all fields.
72  (Type: Iterable, Default: None)
73  :param skip_nans: If True, then don't return any point with a NaN value.
74  (Type: Bool, Default: False)
75  :param uvs: If specified, then only return the points at the given
76  coordinates. (Type: Iterable, Default: None)
77  :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
78  :return: Structured NumPy array containing all points.
79  """
80  assert isinstance(cloud, PointCloud2), \
81  'Cloud is not a sensor_msgs.msg.PointCloud2'
82 
83  # Cast bytes to numpy array
84  points = np.ndarray(
85  shape=(cloud.width * cloud.height, ),
86  dtype=dtype_from_fields(cloud.fields, point_step=cloud.point_step),
87  buffer=cloud.data)
88 
89  # Keep only the requested fields
90  if field_names is not None:
91  assert all(field_name in points.dtype.names for field_name in field_names), \
92  'Requests field is not in the fields of the PointCloud!'
93  # Mask fields
94  points = points[list(field_names)]
95 
96  # Swap array if byte order does not match
97  if bool(sys.byteorder != 'little') != bool(cloud.is_bigendian):
98  points = points.byteswap(inplace=True)
99 
100  # Check if we want to drop points with nan values
101  if skip_nans and not cloud.is_dense:
102  # Init mask which selects all points
103  not_nan_mask = np.ones(len(points), dtype=bool)
104  for field_name in points.dtype.names:
105  # Only keep points without any non values in the mask
106  not_nan_mask = np.logical_and(
107  not_nan_mask, ~np.isnan(points[field_name]))
108  # Select these points
109  points = points[not_nan_mask]
110 
111  # Select points indexed by the uvs field
112  if uvs is not None:
113  # Don't convert to numpy array if it is already one
114  if not isinstance(uvs, np.ndarray):
115  uvs = np.fromiter(uvs, int)
116  # Index requested points
117  points = points[uvs]
118 
119  # Cast into 2d array if cloud is 'organized'
120  if reshape_organized_cloud and cloud.height > 1:
121  points = points.reshape(cloud.width, cloud.height)
122 
123  return points
124 
125 
127  cloud: PointCloud2,
128  field_names: Optional[List[str]] = None,
129  skip_nans: bool = False,
130  uvs: Optional[Iterable] = None,
131  reshape_organized_cloud: bool = False) -> np.ndarray:
132  """
133  Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array.
134 
135  This method is better suited if one wants to perform math operations
136  on e.g. all x,y,z fields.
137  But it is limited to fields with the same dtype as unstructured numpy arrays
138  only contain one dtype.
139 
140  :param cloud: The point cloud to read from sensor_msgs.PointCloud2.
141  :param field_names: The names of fields to read. If None, read all fields.
142  (Type: Iterable, Default: None)
143  :param skip_nans: If True, then don't return any point with a NaN value.
144  (Type: Bool, Default: False)
145  :param uvs: If specified, then only return the points at the given
146  coordinates. (Type: Iterable, Default: None)
147  :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
148  :return: Numpy array containing all points.
149  """
150  assert all(cloud.fields[0].datatype == field.datatype for field in cloud.fields[1:]), \
151  'All fields need to have the same datatype. Use `read_points()` otherwise.'
152  structured_numpy_array = read_points(
153  cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
154  return structured_to_unstructured(structured_numpy_array)
155 
156 
158  cloud: PointCloud2,
159  skip_nans: bool = False,
160  uvs: Optional[Iterable] = None,
161  reshape_organized_cloud: bool = False) -> np.ndarray:
162  """
163  Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array.
164 
165  This method is better suited if one wants to perform math operations
166  on e.g. all x,y,z fields.
167  But it is limited to fields with the same dtype as unstructured numpy arrays
168  only contain one dtype.
169 
170  :param cloud: The point cloud to read from sensor_msgs.PointCloud2.
171  :param field_names: The names of fields to read. If None, read all fields.
172  (Type: Iterable, Default: None)
173  :param skip_nans: If True, then don't return any point with a NaN value.
174  (Type: Bool, Default: False)
175  :param uvs: If specified, then only return the points at the given
176  coordinates. (Type: Iterable, Default: None)
177  :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
178  :return: Numpy array containing all points.
179  """
180  field_names = [field.name for field in cloud.fields if field.name in ['x', 'y', 'z']]
181  structured_numpy_array = read_points(
182  cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
183  return structured_to_unstructured(structured_numpy_array)
184 
186  cloud: PointCloud2,
187  field_names: Optional[List[str]] = None,
188  skip_nans: bool = False,
189  uvs: Optional[Iterable] = None) -> List[NamedTuple]:
190  """
191  Read points from a sensor_msgs.PointCloud2 message.
192 
193  This function returns a list of namedtuples. It operates on top of the
194  read_points method. For more efficient access use read_points directly.
195 
196  :param cloud: The point cloud to read from. (Type: sensor_msgs.PointCloud2)
197  :param field_names: The names of fields to read. If None, read all fields.
198  (Type: Iterable, Default: None)
199  :param skip_nans: If True, then don't return any point with a NaN value.
200  (Type: Bool, Default: False)
201  :param uvs: If specified, then only return the points at the given
202  coordinates. (Type: Iterable, Default: None]
203  :return: List of namedtuples containing the values for each point
204  """
205  assert isinstance(cloud, PointCloud2), \
206  'cloud is not a sensor_msgs.msg.PointCloud2'
207 
208  if field_names is None:
209  field_names = [f.name for f in cloud.fields]
210 
211  Point = namedtuple('Point', field_names)
212 
213  return [Point._make(p) for p in read_points(cloud, field_names,
214  skip_nans, uvs)]
215 
216 
217 def dtype_from_fields(fields: Iterable[PointField], point_step: Optional[int] = None) -> np.dtype:
218  """
219  Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.
220 
221  :param fields: The point cloud fields.
222  (Type: iterable of sensor_msgs.msg.PointField)
223  :param point_step: Point step size in bytes. Calculated from the given fields by default.
224  (Type: optional of integer)
225  :returns: NumPy datatype
226  """
227  # Create a lists containing the names, offsets and datatypes of all fields
228  field_names = []
229  field_offsets = []
230  field_datatypes = []
231  for i, field in enumerate(fields):
232  # Datatype as numpy datatype
233  datatype = _DATATYPES[field.datatype]
234  # Name field
235  if field.name == '':
236  name = f'{DUMMY_FIELD_PREFIX}_{i}'
237  else:
238  name = field.name
239  # Handle fields with count > 1 by creating subfields with a suffix consiting
240  # of "_" followed by the subfield counter [0 -> (count - 1)]
241  assert field.count > 0, "Can't process fields with count = 0."
242  for a in range(field.count):
243  # Add suffix if we have multiple subfields
244  if field.count > 1:
245  subfield_name = f'{name}_{a}'
246  else:
247  subfield_name = name
248  assert subfield_name not in field_names, 'Duplicate field names are not allowed!'
249  field_names.append(subfield_name)
250  # Create new offset that includes subfields
251  field_offsets.append(field.offset + a * datatype.itemsize)
252  field_datatypes.append(datatype.str)
253 
254  # Create dtype
255  dtype_dict = {
256  'names': field_names,
257  'formats': field_datatypes,
258  'offsets': field_offsets
259  }
260  if point_step is not None:
261  dtype_dict['itemsize'] = point_step
262  return np.dtype(dtype_dict)
263 
264 
266  header: Header,
267  fields: Iterable[PointField],
268  points: Iterable) -> PointCloud2:
269  """
270  Create a sensor_msgs.msg.PointCloud2 message.
271 
272  :param header: The point cloud header. (Type: std_msgs.msg.Header)
273  :param fields: The point cloud fields.
274  (Type: iterable of sensor_msgs.msg.PointField)
275  :param points: The point cloud points. List of iterables, i.e. one iterable
276  for each point, with the elements of each iterable being the
277  values of the fields for that point (in the same order as
278  the fields parameter)
279  :return: The point cloud as sensor_msgs.msg.PointCloud2
280  """
281  # Check if input is numpy array
282  if isinstance(points, np.ndarray):
283  # Check if this is an unstructured array
284  if points.dtype.names is None:
285  assert all(fields[0].datatype == field.datatype for field in fields[1:]), \
286  'All fields need to have the same datatype. Pass a structured NumPy array \
287  with multiple dtypes otherwise.'
288  # Convert unstructured to structured array
289  points = unstructured_to_structured(
290  points,
291  dtype=dtype_from_fields(fields))
292  else:
293  assert points.dtype == dtype_from_fields(fields), \
294  'PointFields and structured NumPy array dtype do not match for all fields! \
295  Check their field order, names and types.'
296  else:
297  # Cast python objects to structured NumPy array (slow)
298  points = np.array(
299  # Points need to be tuples in the structured array
300  list(map(tuple, points)),
301  dtype=dtype_from_fields(fields))
302 
303  # Handle organized clouds
304  assert len(points.shape) <= 2, \
305  'Too many dimensions for organized cloud! \
306  Points can only be organized in max. two dimensional space'
307  height = 1
308  width = points.shape[0]
309  # Check if input points are an organized cloud (2D array of points)
310  if len(points.shape) == 2:
311  height = points.shape[1]
312 
313  # Convert numpy points to array.array
314  memory_view = memoryview(points)
315  casted = memory_view.cast('B')
316  array_array = array.array('B')
317  array_array.frombytes(casted)
318 
319  # Put everything together
320  cloud = PointCloud2(
321  header=header,
322  height=height,
323  width=width,
324  is_dense=False,
325  is_bigendian=sys.byteorder != 'little',
326  fields=fields,
327  point_step=points.dtype.itemsize,
328  row_step=(points.dtype.itemsize * width))
329  # Set cloud via property instead of the constructor because of the bug described in
330  # https://github.com/ros2/common_interfaces/issues/176
331  cloud.data = array_array
332  return cloud
333 
334 
335 def create_cloud_xyz32(header: Header, points: Iterable) -> PointCloud2:
336  """
337  Create a sensor_msgs.msg.PointCloud2 message with (x, y, z) fields.
338 
339  :param header: The point cloud header. (Type: std_msgs.msg.Header)
340  :param points: The point cloud points. (Type: Iterable)
341  :return: The point cloud as sensor_msgs.msg.PointCloud2.
342  """
343  fields = [PointField(name='x', offset=0,
344  datatype=PointField.FLOAT32, count=1),
345  PointField(name='y', offset=4,
346  datatype=PointField.FLOAT32, count=1),
347  PointField(name='z', offset=8,
348  datatype=PointField.FLOAT32, count=1)]
349  return create_cloud(header, fields, points)
np.dtype dtype_from_fields(Iterable[PointField] fields, Optional[int] point_step=None)
np.ndarray read_points(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)
Definition: point_cloud2.py:66
List[NamedTuple] read_points_list(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None)
np.ndarray read_points_numpy_filtered(PointCloud2 cloud, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)
PointCloud2 create_cloud_xyz32(Header header, Iterable points)
PointCloud2 create_cloud(Header header, Iterable[PointField] fields, Iterable points)
np.ndarray read_points_numpy(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)