33 from collections 
import namedtuple
 
   35 from typing 
import Iterable, List, NamedTuple, Optional
 
   39     from numpy.lib.recfunctions 
import (structured_to_unstructured, unstructured_to_structured)
 
   41     from sensor_msgs_py.numpy_compat 
import (structured_to_unstructured,
 
   42                                              unstructured_to_structured)
 
   44 from sensor_msgs.msg 
import PointCloud2, PointField
 
   45 from std_msgs.msg 
import Header
 
   49 _DATATYPES[PointField.INT8] = np.dtype(np.int8)
 
   50 _DATATYPES[PointField.UINT8] = np.dtype(np.uint8)
 
   51 _DATATYPES[PointField.INT16] = np.dtype(np.int16)
 
   52 _DATATYPES[PointField.UINT16] = np.dtype(np.uint16)
 
   53 _DATATYPES[PointField.INT32] = np.dtype(np.int32)
 
   54 _DATATYPES[PointField.UINT32] = np.dtype(np.uint32)
 
   55 _DATATYPES[PointField.FLOAT32] = np.dtype(np.float32)
 
   56 _DATATYPES[PointField.FLOAT64] = np.dtype(np.float64)
 
   58 DUMMY_FIELD_PREFIX = 
'unnamed_field' 
   63         field_names: Optional[List[str]] = 
None,
 
   64         skip_nans: bool = 
False,
 
   65         uvs: Optional[Iterable] = 
None,
 
   66         reshape_organized_cloud: bool = 
False) -> np.ndarray:
 
   68     Read points from a sensor_msgs.PointCloud2 message. 
   70     :param cloud: The point cloud to read from sensor_msgs.PointCloud2. 
   71     :param field_names: The names of fields to read. If None, read all fields. 
   72                         (Type: Iterable, Default: None) 
   73     :param skip_nans: If True, then don't return any point with a NaN value. 
   74                       (Type: Bool, Default: False) 
   75     :param uvs: If specified, then only return the points at the given 
   76         coordinates. (Type: Iterable, Default: None) 
   77     :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set. 
   78     :return: Structured NumPy array containing all points. 
   80     assert isinstance(cloud, PointCloud2), \
 
   81         'Cloud is not a sensor_msgs.msg.PointCloud2' 
   85         shape=(cloud.width * cloud.height, ),
 
   90     if field_names 
is not None:
 
   91         assert all(field_name 
in points.dtype.names 
for field_name 
in field_names), \
 
   92             'Requests field is not in the fields of the PointCloud!' 
   94         points = points[list(field_names)]
 
   97     if bool(sys.byteorder != 
'little') != bool(cloud.is_bigendian):
 
   98         points = points.byteswap(inplace=
True)
 
  101     if skip_nans 
and not cloud.is_dense:
 
  103         not_nan_mask = np.ones(len(points), dtype=bool)
 
  104         for field_name 
in points.dtype.names:
 
  106             not_nan_mask = np.logical_and(
 
  107                 not_nan_mask, ~np.isnan(points[field_name]))
 
  109         points = points[not_nan_mask]
 
  114         if not isinstance(uvs, np.ndarray):
 
  115             uvs = np.fromiter(uvs, int)
 
  120     if reshape_organized_cloud 
and cloud.height > 1:
 
  121         points = points.reshape(cloud.width, cloud.height)
 
  128         field_names: Optional[List[str]] = 
None,
 
  129         skip_nans: bool = 
False,
 
  130         uvs: Optional[Iterable] = 
None,
 
  131         reshape_organized_cloud: bool = 
False) -> np.ndarray:
 
  133     Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array. 
  135     This method is better suited if one wants to perform math operations 
  136     on e.g. all x,y,z fields. 
  137     But it is limited to fields with the same dtype as unstructured numpy arrays 
  138     only contain one dtype. 
  140     :param cloud: The point cloud to read from sensor_msgs.PointCloud2. 
  141     :param field_names: The names of fields to read. If None, read all fields. 
  142                         (Type: Iterable, Default: None) 
  143     :param skip_nans: If True, then don't return any point with a NaN value. 
  144                       (Type: Bool, Default: False) 
  145     :param uvs: If specified, then only return the points at the given 
  146         coordinates. (Type: Iterable, Default: None) 
  147     :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set. 
  148     :return: Numpy array containing all points. 
  150     assert all(cloud.fields[0].datatype == field.datatype 
for field 
in cloud.fields[1:]), \
 
  151         'All fields need to have the same datatype. Use `read_points()` otherwise.' 
  153         cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
 
  154     return structured_to_unstructured(structured_numpy_array)
 
  159         skip_nans: bool = 
False,
 
  160         uvs: Optional[Iterable] = 
None,
 
  161         reshape_organized_cloud: bool = 
False) -> np.ndarray:
 
  163     Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array. 
  165     This method is better suited if one wants to perform math operations 
  166     on e.g. all x,y,z fields. 
  167     But it is limited to fields with the same dtype as unstructured numpy arrays 
  168     only contain one dtype. 
  170     :param cloud: The point cloud to read from sensor_msgs.PointCloud2. 
  171     :param field_names: The names of fields to read. If None, read all fields. 
  172                         (Type: Iterable, Default: None) 
  173     :param skip_nans: If True, then don't return any point with a NaN value. 
  174                       (Type: Bool, Default: False) 
  175     :param uvs: If specified, then only return the points at the given 
  176         coordinates. (Type: Iterable, Default: None) 
  177     :param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set. 
  178     :return: Numpy array containing all points. 
  180     field_names = [field.name 
for field 
in cloud.fields 
if field.name 
in [
'x', 
'y', 
'z']]
 
  182         cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
 
  183     return structured_to_unstructured(structured_numpy_array)
 
  187         field_names: Optional[List[str]] = 
None,
 
  188         skip_nans: bool = 
False,
 
  189         uvs: Optional[Iterable] = 
None) -> List[NamedTuple]:
 
  191     Read points from a sensor_msgs.PointCloud2 message. 
  193     This function returns a list of namedtuples. It operates on top of the 
  194     read_points method. For more efficient access use read_points directly. 
  196     :param cloud: The point cloud to read from. (Type: sensor_msgs.PointCloud2) 
  197     :param field_names: The names of fields to read. If None, read all fields. 
  198                         (Type: Iterable, Default: None) 
  199     :param skip_nans: If True, then don't return any point with a NaN value. 
  200                       (Type: Bool, Default: False) 
  201     :param uvs: If specified, then only return the points at the given 
  202                 coordinates. (Type: Iterable, Default: None] 
  203     :return: List of namedtuples containing the values for each point 
  205     assert isinstance(cloud, PointCloud2), \
 
  206         'cloud is not a sensor_msgs.msg.PointCloud2' 
  208     if field_names 
is None:
 
  209         field_names = [f.name 
for f 
in cloud.fields]
 
  211     Point = namedtuple(
'Point', field_names)
 
  213     return [Point._make(p) 
for p 
in read_points(cloud, field_names,
 
  217 def dtype_from_fields(fields: Iterable[PointField], point_step: Optional[int] = 
None) -> np.dtype:
 
  219     Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype. 
  221     :param fields: The point cloud fields. 
  222                    (Type: iterable of sensor_msgs.msg.PointField) 
  223     :param point_step: Point step size in bytes. Calculated from the given fields by default. 
  224                        (Type: optional of integer) 
  225     :returns: NumPy datatype 
  231     for i, field 
in enumerate(fields):
 
  233         datatype = _DATATYPES[field.datatype]
 
  236             name = f
'{DUMMY_FIELD_PREFIX}_{i}' 
  241         assert field.count > 0, 
"Can't process fields with count = 0." 
  242         for a 
in range(field.count):
 
  245                 subfield_name = f
'{name}_{a}' 
  248             assert subfield_name 
not in field_names, 
'Duplicate field names are not allowed!' 
  249             field_names.append(subfield_name)
 
  251             field_offsets.append(field.offset + a * datatype.itemsize)
 
  252             field_datatypes.append(datatype.str)
 
  256             'names': field_names,
 
  257             'formats': field_datatypes,
 
  258             'offsets': field_offsets
 
  260     if point_step 
is not None:
 
  261         dtype_dict[
'itemsize'] = point_step
 
  262     return np.dtype(dtype_dict)
 
  267         fields: Iterable[PointField],
 
  268         points: Iterable) -> PointCloud2:
 
  270     Create a sensor_msgs.msg.PointCloud2 message. 
  272     :param header: The point cloud header. (Type: std_msgs.msg.Header) 
  273     :param fields: The point cloud fields. 
  274                    (Type: iterable of sensor_msgs.msg.PointField) 
  275     :param points: The point cloud points. List of iterables, i.e. one iterable 
  276                    for each point, with the elements of each iterable being the 
  277                    values of the fields for that point (in the same order as 
  278                    the fields parameter) 
  279     :return: The point cloud as sensor_msgs.msg.PointCloud2 
  282     if isinstance(points, np.ndarray):
 
  284         if points.dtype.names 
is None:
 
  285             assert all(fields[0].datatype == field.datatype 
for field 
in fields[1:]), \
 
  286                 'All fields need to have the same datatype. Pass a structured NumPy array \ 
  287                     with multiple dtypes otherwise.' 
  289             points = unstructured_to_structured(
 
  294                 'PointFields and structured NumPy array dtype do not match for all fields! \ 
  295                     Check their field order, names and types.' 
  300             list(map(tuple, points)),
 
  304     assert len(points.shape) <= 2, \
 
  305         'Too many dimensions for organized cloud! \ 
  306             Points can only be organized in max. two dimensional space' 
  308     width = points.shape[0]
 
  310     if len(points.shape) == 2:
 
  311         height = points.shape[1]
 
  314     memory_view = memoryview(points)
 
  315     casted = memory_view.cast(
'B')
 
  316     array_array = array.array(
'B')
 
  317     array_array.frombytes(casted)
 
  325         is_bigendian=sys.byteorder != 
'little',
 
  327         point_step=points.dtype.itemsize,
 
  328         row_step=(points.dtype.itemsize * width))
 
  331     cloud.data = array_array
 
  337     Create a sensor_msgs.msg.PointCloud2 message with (x, y, z) fields. 
  339     :param header: The point cloud header. (Type: std_msgs.msg.Header) 
  340     :param points: The point cloud points. (Type: Iterable) 
  341     :return: The point cloud as sensor_msgs.msg.PointCloud2. 
  343     fields = [PointField(name=
'x', offset=0,
 
  344                          datatype=PointField.FLOAT32, count=1),
 
  345               PointField(name=
'y', offset=4,
 
  346                          datatype=PointField.FLOAT32, count=1),
 
  347               PointField(name=
'z', offset=8,
 
  348                          datatype=PointField.FLOAT32, count=1)]
 
np.dtype dtype_from_fields(Iterable[PointField] fields, Optional[int] point_step=None)
 
np.ndarray read_points(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)
 
List[NamedTuple] read_points_list(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None)
 
np.ndarray read_points_numpy_filtered(PointCloud2 cloud, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)
 
PointCloud2 create_cloud_xyz32(Header header, Iterable points)
 
PointCloud2 create_cloud(Header header, Iterable[PointField] fields, Iterable points)
 
np.ndarray read_points_numpy(PointCloud2 cloud, Optional[List[str]] field_names=None, bool skip_nans=False, Optional[Iterable] uvs=None, bool reshape_organized_cloud=False)