PointCloud2를 Open3d에서 처리한 이후 다시 PointCloud2 형태로 변환하여 Publish 하는 코드이다.
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile, QoSReliabilityPolicy, QoSHistoryPolicy
from sensor_msgs.msg import PointCloud2, PointField
import open3d as o3d
import numpy as np
from std_msgs.msg import Header
from sensor_msgs_py import point_cloud2 as pc2
class MinimalSubscriber(Node):
def __init__(self):
super().__init__('pc2_to_open3d_demo')
qos = QoSProfile(
reliability=QoSReliabilityPolicy.BEST_EFFORT,
history=QoSHistoryPolicy.KEEP_LAST,
depth=1
)
self.create_subscription(PointCloud2, '/velodyne_points', self.cb, qos)
self.o3d_publisher = self.create_publisher(PointCloud2, 'o3d_points', 10)
def cb(self, msg: PointCloud2):
try:
pcd = self.pointcloud2_to_o3d(msg)
# pcd = pcd.voxel_down_sample(voxel_size=0.3)
pcd_out = self.o3d_to_pointcloud2(
pcd,
frame_id=msg.header.frame_id,
stamp=msg.header.stamp
)
self.o3d_publisher.publish(pcd_out)
except Exception as e:
self.get_logger().warn(f"conversion failed in cb: {e}")
# ---------- Open3D -> PointCloud2 ----------
def o3d_to_pointcloud2(self, pcd: o3d.geometry.PointCloud, frame_id: str, stamp=None) -> PointCloud2:
# points
xyz = np.asarray(pcd.points, dtype=np.float32) # (N,3) float32
if xyz.size == 0:
header = Header()
header.stamp = stamp
header.frame_id = frame_id
return pc2.create_cloud(header, [], [])
# colors(optional)
use_rgb = pcd.has_colors()
if use_rgb:
cols = np.asarray(pcd.colors, dtype=np.float32) # (N,3) in [0,1]
cols = np.clip(cols, 0.0, 1.0)
cols255 = (cols * 255.0).astype(np.uint8)
# pack into float32 'rgb'
rgb_u32 = (cols255[:,0].astype(np.uint32) << 16) | \\
(cols255[:,1].astype(np.uint32) << 8) | \\
cols255[:,2].astype(np.uint32)
rgb_f32 = rgb_u32.view(np.float32)
data = np.c_[xyz, rgb_f32] # (N,4)
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
PointField(name='rgb', offset=12, datatype=PointField.FLOAT32, count=1),
]
else:
data = xyz # (N,3)
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
]
header = Header()
header.stamp = stamp
header.frame_id = frame_id
return pc2.create_cloud(header, fields, data)
def pointcloud2_to_o3d(self, msg: PointCloud2) -> o3d.geometry.PointCloud:
want_fields = ('x','y','z')
xs, ys, zs = [], [], []
# read_points는 tuple-류를 뱉습니다. 인덱스로 우선 접근, 실패시 getattr로 보조.
for r in pc2.read_points(msg, field_names=want_fields, skip_nans=True): # r format : (x,y,z,intensity)
try:
# tuple 인덱싱 경로
x, y, z = r[0], r[1], r[2]
xs.append(x); ys.append(y); zs.append(z)
except Exception:
# 혹시 namedtuple/np.void 등일 때
x, y, z = getattr(r, 'x'), getattr(r, 'y'), getattr(r, 'z')
xs.append(x); ys.append(y); zs.append(z)
# 포인트 구성
if len(xs) == 0:
return o3d.geometry.PointCloud()
xyz = np.column_stack([xs, ys, zs]).astype(np.float64, copy=False)
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz)
return pcd
def destroy_node(self):
super().destroy_node()
def main():
rclpy.init()
node = MinimalSubscriber()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()