33 from collections
import namedtuple
35 from typing
import Iterable, List, NamedTuple, Optional
39 from numpy.lib.recfunctions
import (structured_to_unstructured, unstructured_to_structured)
41 from sensor_msgs_py.numpy_compat
import (structured_to_unstructured,
42 unstructured_to_structured)
44 from sensor_msgs.msg
import PointCloud2, PointField
45 from std_msgs.msg
import Header
49 _DATATYPES[PointField.INT8] = np.dtype(np.int8)
50 _DATATYPES[PointField.UINT8] = np.dtype(np.uint8)
51 _DATATYPES[PointField.INT16] = np.dtype(np.int16)
52 _DATATYPES[PointField.UINT16] = np.dtype(np.uint16)
53 _DATATYPES[PointField.INT32] = np.dtype(np.int32)
54 _DATATYPES[PointField.UINT32] = np.dtype(np.uint32)
55 _DATATYPES[PointField.FLOAT32] = np.dtype(np.float32)
56 _DATATYPES[PointField.FLOAT64] = np.dtype(np.float64)
58 DUMMY_FIELD_PREFIX =
'unnamed_field'
63 field_names: Optional[List[str]] =
None,
64 skip_nans: bool =
False,
65 uvs: Optional[Iterable] =
None,
66 reshape_organized_cloud: bool =
False) -> np.ndarray:
68 Read points from a sensor_msgs.PointCloud2 message.
70 :param cloud: The point cloud to read from sensor_msgs.PointCloud2.
71 :param field_names: The names of fields to read. If None, read all fields.
72 (Type: Iterable, Default: None)
73 :param skip_nans: If True, then don't return any point with a NaN value.
74 (Type: Bool, Default: False)
75 :param uvs: If specified, then only return the points at the given
76 coordinates. (Type: Iterable, Default: None)
77 :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
78 :return: Structured NumPy array containing all points.
80 assert isinstance(cloud, PointCloud2), \
81 'Cloud is not a sensor_msgs.msg.PointCloud2'
85 shape=(cloud.width * cloud.height, ),
90 if field_names
is not None:
91 assert all(field_name
in points.dtype.names
for field_name
in field_names), \
92 'Requests field is not in the fields of the PointCloud!'
94 points = points[list(field_names)]
97 if bool(sys.byteorder !=
'little') != bool(cloud.is_bigendian):
98 points = points.byteswap(inplace=
True)
101 if skip_nans
and not cloud.is_dense:
103 not_nan_mask = np.ones(len(points), dtype=bool)
104 for field_name
in points.dtype.names:
106 not_nan_mask = np.logical_and(
107 not_nan_mask, ~np.isnan(points[field_name]))
109 points = points[not_nan_mask]
114 if not isinstance(uvs, np.ndarray):
115 uvs = np.fromiter(uvs, int)
120 if reshape_organized_cloud
and cloud.height > 1:
121 points = points.reshape(cloud.width, cloud.height)
128 field_names: Optional[List[str]] =
None,
129 skip_nans: bool =
False,
130 uvs: Optional[Iterable] =
None,
131 reshape_organized_cloud: bool =
False) -> np.ndarray:
133 Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array.
135 This method is better suited if one wants to perform math operations
136 on e.g. all x,y,z fields.
137 But it is limited to fields with the same dtype as unstructured numpy arrays
138 only contain one dtype.
140 :param cloud: The point cloud to read from sensor_msgs.PointCloud2.
141 :param field_names: The names of fields to read. If None, read all fields.
142 (Type: Iterable, Default: None)
143 :param skip_nans: If True, then don't return any point with a NaN value.
144 (Type: Bool, Default: False)
145 :param uvs: If specified, then only return the points at the given
146 coordinates. (Type: Iterable, Default: None)
147 :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
148 :return: Numpy array containing all points.
150 assert all(cloud.fields[0].datatype == field.datatype
for field
in cloud.fields[1:]), \
151 'All fields need to have the same datatype. Use `read_points()` otherwise.'
153 cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
154 return structured_to_unstructured(structured_numpy_array)
159 skip_nans: bool =
False,
160 uvs: Optional[Iterable] =
None,
161 reshape_organized_cloud: bool =
False) -> np.ndarray:
163 Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array.
165 This method is better suited if one wants to perform math operations
166 on e.g. all x,y,z fields.
167 But it is limited to fields with the same dtype as unstructured numpy arrays
168 only contain one dtype.
170 :param cloud: The point cloud to read from sensor_msgs.PointCloud2.
171 :param field_names: The names of fields to read. If None, read all fields.
172 (Type: Iterable, Default: None)
173 :param skip_nans: If True, then don't return any point with a NaN value.
174 (Type: Bool, Default: False)
175 :param uvs: If specified, then only return the points at the given
176 coordinates. (Type: Iterable, Default: None)
177 :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
178 :return: Numpy array containing all points.
180 field_names = [field.name
for field
in cloud.fields
if field.name
in [
'x',
'y',
'z']]
182 cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
183 return structured_to_unstructured(structured_numpy_array)
187 field_names: Optional[List[str]] =
None,
188 skip_nans: bool =
False,
189 uvs: Optional[Iterable] =
None) -> List[NamedTuple]:
191 Read points from a sensor_msgs.PointCloud2 message.
193 This function returns a list of namedtuples. It operates on top of the
194 read_points method. For more efficient access use read_points directly.
196 :param cloud: The point cloud to read from. (Type: sensor_msgs.PointCloud2)
197 :param field_names: The names of fields to read. If None, read all fields.
198 (Type: Iterable, Default: None)
199 :param skip_nans: If True, then don't return any point with a NaN value.
200 (Type: Bool, Default: False)
201 :param uvs: If specified, then only return the points at the given
202 coordinates. (Type: Iterable, Default: None]
203 :return: List of namedtuples containing the values for each point
205 assert isinstance(cloud, PointCloud2), \
206 'cloud is not a sensor_msgs.msg.PointCloud2'
208 if field_names
is None:
209 field_names = [f.name
for f
in cloud.fields]
211 Point = namedtuple(
'Point', field_names)
213 return [Point._make(p)
for p
in read_points(cloud, field_names,
217 def dtype_from_fields(fields: Iterable[PointField], point_step: Optional[int] =
None) -> np.dtype:
219 Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.
221 :param fields: The point cloud fields.
222 (Type: iterable of sensor_msgs.msg.PointField)
223 :param point_step: Point step size in bytes. Calculated from the given fields by default.
224 (Type: optional of integer)
225 :returns: NumPy datatype
231 for i, field
in enumerate(fields):
233 datatype = _DATATYPES[field.datatype]
236 name = f
'{DUMMY_FIELD_PREFIX}_{i}'
241 assert field.count > 0,
"Can't process fields with count = 0."
242 for a
in range(field.count):
245 subfield_name = f
'{name}_{a}'
248 assert subfield_name
not in field_names,
'Duplicate field names are not allowed!'
249 field_names.append(subfield_name)
251 field_offsets.append(field.offset + a * datatype.itemsize)
252 field_datatypes.append(datatype.str)
256 'names': field_names,
257 'formats': field_datatypes,
258 'offsets': field_offsets
260 if point_step
is not None:
261 dtype_dict[
'itemsize'] = point_step
262 return np.dtype(dtype_dict)
267 fields: Iterable[PointField],
268 points: Iterable) -> PointCloud2:
270 Create a sensor_msgs.msg.PointCloud2 message.
272 :param header: The point cloud header. (Type: std_msgs.msg.Header)
273 :param fields: The point cloud fields.
274 (Type: iterable of sensor_msgs.msg.PointField)
275 :param points: The point cloud points. List of iterables, i.e. one iterable
276 for each point, with the elements of each iterable being the
277 values of the fields for that point (in the same order as
278 the fields parameter)
279 :return: The point cloud as sensor_msgs.msg.PointCloud2
282 if isinstance(points, np.ndarray):
284 if points.dtype.names
is None:
285 assert all(fields[0].datatype == field.datatype
for field
in fields[1:]), \
286 'All fields need to have the same datatype. Pass a structured NumPy array \
287 with multiple dtypes otherwise.'
289 points = unstructured_to_structured(
294 'PointFields and structured NumPy array dtype do not match for all fields! \
295 Check their field order, names and types.'
300 list(map(tuple, points)),
304 assert len(points.shape) <= 2, \
305 'Too many dimensions for organized cloud! \
306 Points can only be organized in max. two dimensional space'
308 width = points.shape[0]
310 if len(points.shape) == 2:
311 height = points.shape[1]
314 memory_view = memoryview(points)
315 casted = memory_view.cast(
'B')
316 array_array = array.array(
'B')
317 array_array.frombytes(casted)
325 is_bigendian=sys.byteorder !=
'little',
327 point_step=points.dtype.itemsize,
328 row_step=(points.dtype.itemsize * width))
331 cloud.data = array_array
337 Create a sensor_msgs.msg.PointCloud2 message with (x, y, z) fields.
339 :param header: The point cloud header. (Type: std_msgs.msg.Header)
340 :param points: The point cloud points. (Type: Iterable)
341 :return: The point cloud as sensor_msgs.msg.PointCloud2.
343 fields = [PointField(name=
'x', offset=0,
344 datatype=PointField.FLOAT32, count=1),
345 PointField(name=
'y', offset=4,
346 datatype=PointField.FLOAT32, count=1),
347 PointField(name=
'z', offset=8,
348 datatype=PointField.FLOAT32, count=1)]
np.dtype dtype_from_fields(Iterable[PointField] fields, Optional[int] point_step=None)
np.ndarray read_points(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)
List[NamedTuple] read_points_list(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None)
np.ndarray read_points_numpy_filtered(PointCloud2 cloud, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)
PointCloud2 create_cloud_xyz32(Header header, Iterable points)
PointCloud2 create_cloud(Header header, Iterable[PointField] fields, Iterable points)
np.ndarray read_points_numpy(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)