Open3D를 활용한 Ransac 예시이다.

흰색 점군은 ransac으로 처리된 결과이고, 나머지 포인트는 raw point cloud 데이터이다.

바닥 부분이 사라진 것을 확인할 수 있다.

스크린캐스트 11-08-2025 11:41:50 PM.webm

#!/usr/bin/env python3
import math
import numpy as np
import rclpy
from rclpy.node import Node

from sensor_msgs.msg import PointCloud2, PointField
from sensor_msgs_py import point_cloud2 as pc2  # 공식 파이썬 유틸
from std_msgs.msg import Header

import open3d as o3d

import tf2_ros
from tf2_ros import TransformException
from tf2_sensor_msgs.tf2_sensor_msgs import do_transform_cloud

from rcl_interfaces.msg import ParameterType, ParameterDescriptor

def pointcloud2_to_xyz(msg: PointCloud2) -> np.ndarray:
    """PointCloud2 → (N,3) float32 numpy array (x,y,z). NaN/inf 제외."""
    # structured array 방지: tuple -> list of list -> float32 ndarray
    try:
        pts = [[x, y, z] for (x, y, z) in pc2.read_points(
            msg, field_names=("x", "y", "z"), skip_nans=True
        )]
    except Exception:
        # 혹시 일부 드라이버가 namedtuple/np.void를 줄 때 방어
        pts = []
        for r in pc2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True):
            try:
                pts.append([r[0], r[1], r[2]])
            except Exception:
                pts.append([getattr(r, 'x'), getattr(r, 'y'), getattr(r, 'z')])

    if not pts:
        return np.empty((0, 3), dtype=np.float32)
    return np.asarray(pts, dtype=np.float32)

def xyz_to_pointcloud2(xyz: np.ndarray, frame_id: str, stamp=None) -> PointCloud2:
    """(N,3) float32 → PointCloud2"""
    header = Header()
    header.frame_id = frame_id
    if stamp is not None:
        header.stamp = stamp

    # fields 정의 (x,y,z)
    fields = [
        PointField(name='x', offset=0,  datatype=PointField.FLOAT32, count=1),
        PointField(name='y', offset=4,  datatype=PointField.FLOAT32, count=1),
        PointField(name='z', offset=8,  datatype=PointField.FLOAT32, count=1),
    ]
    return pc2.create_cloud(header, fields, xyz.tolist())

class GroundRemoveOpen3D(Node):
    def __init__(self):
        super().__init__('ground_remove_open3d')

        # === 파라미터 ===
        self.declare_parameter('input_topic', '/velodyne_points')
        self.declare_parameter('output_topic', '/velodyne')
        self.declare_parameter('use_tf', True)
        self.declare_parameter('target_frame', 'base_link')  # TF 변환 목적 프레임
        self.declare_parameter('voxel_leaf', 0.05)           # m
        self.declare_parameter('ransac_dist_thresh', 0.05)   # m
        self.declare_parameter('ransac_iter', 500)
        #self.declare_parameter('z_pass_min', None)           # 예: -0.1 (m)  사용 안하려면 None
        #self.declare_parameter('z_pass_max', None)           # 예:  0.15 (m)
        
        # 변경 (동적 타이핑 허용)
        self.declare_parameter('z_pass_min', None,
            descriptor=ParameterDescriptor(dynamic_typing=True))
        self.declare_parameter('z_pass_max', None,
            descriptor=ParameterDescriptor(dynamic_typing=True))
            
        self.input_topic = self.get_parameter('input_topic').get_parameter_value().string_value
        self.output_topic = self.get_parameter('output_topic').get_parameter_value().string_value
        self.use_tf = self.get_parameter('use_tf').get_parameter_value().bool_value
        self.target_frame = self.get_parameter('target_frame').get_parameter_value().string_value
        self.voxel_leaf = self.get_parameter('voxel_leaf').get_parameter_value().double_value
        self.ransac_dist = self.get_parameter('ransac_dist_thresh').get_parameter_value().double_value
        self.ransac_iter = self.get_parameter('ransac_iter').get_parameter_value().integer_value

        #zmin_param = self.get_parameter('z_pass_min').get_parameter_value()
        #zmax_param = self.get_parameter('z_pass_max').get_parameter_value()
        #self.z_pass_min = zmin_param.double_value if zmin_param.type == zmin_param.Type.DOUBLE else None
        #self.z_pass_max = zmax_param.double_value if zmax_param.type == zmax_param.Type.DOUBLE else None

        # 변경
        p_zmin = self.get_parameter('z_pass_min').get_parameter_value()
        p_zmax = self.get_parameter('z_pass_max').get_parameter_value()

        def _param_to_float_or_none(pv):
            if pv.type == ParameterType.PARAMETER_DOUBLE:
                return float(pv.double_value)
            if pv.type == ParameterType.PARAMETER_INTEGER:
                return float(pv.integer_value)
            return None  # NOT_SET, STRING, BOOL 등은 무시

        self.z_pass_min = _param_to_float_or_none(p_zmin)
        self.z_pass_max = _param_to_float_or_none(p_zmax)

        # TF buffer/listener
        self.tf_buffer = tf2_ros.Buffer(cache_time=rclpy.duration.Duration(seconds=10.0))
        self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)

        # Pub/Sub
        self.sub = self.create_subscription(PointCloud2, self.input_topic, self.cb_cloud, 10)
        self.pub = self.create_publisher(PointCloud2, self.output_topic, 10)

        self.get_logger().info(
            f"[GroundRemoveOpen3D] Sub: {self.input_topic}  → Pub: {self.output_topic} "
            f"(use_tf={self.use_tf}, target_frame={self.target_frame})"
        )

    def cb_cloud(self, msg: PointCloud2):
        # 1) (옵션) TF 변환
        cloud_msg = msg
        out_frame = msg.header.frame_id

        if self.use_tf:
            try:
                transform = self.tf_buffer.lookup_transform(
                    self.target_frame,  # target
                    msg.header.frame_id,  # source
                    rclpy.time.Time())  # latest
                cloud_msg = do_transform_cloud(msg, transform)
                out_frame = self.target_frame
            except TransformException as ex:
                self.get_logger().warn(f"TF transform failed: {ex}")
                return

        # 2) PointCloud2 → numpy (N,3)
        xyz = pointcloud2_to_xyz(cloud_msg)
        if xyz.shape[0] == 0:
            return

        # 4) Open3D 변환
        pcd = o3d.geometry.PointCloud()
        pcd.points = o3d.utility.Vector3dVector(xyz.astype(np.float64))

        # 5) Voxel downsample
        if self.voxel_leaf > 0.0:
            pcd = pcd.voxel_down_sample(voxel_size=self.voxel_leaf)
            if np.asarray(pcd.points).shape[0] == 0:
                out = xyz_to_pointcloud2(np.empty((0, 3), dtype=np.float32), out_frame, stamp=msg.header.stamp)
                self.pub.publish(out)
                return

        # 6) RANSAC 평면 분할 (ground 추정) → inliers 제거
        plane_model, inliers = pcd.segment_plane(distance_threshold=self.ransac_dist,
                                                 ransac_n=3,
                                                 num_iterations=self.ransac_iter)
        # plane: ax + by + cz + d = 0
        # self.get_logger().debug(f"Plane coeff: {plane_model}")

        if len(inliers) > 0:
            pcd_no_ground = pcd.select_by_index(inliers, invert=True)
        else:
            pcd_no_ground = pcd  # 평면 못 찾으면 그대로

        out_xyz = np.asarray(pcd_no_ground.points).astype(np.float32)

        # 7) republish PointCloud2
        out_msg = xyz_to_pointcloud2(out_xyz, out_frame, stamp=msg.header.stamp)
        self.pub.publish(out_msg)

def main():
    rclpy.init()
    node = GroundRemoveOpen3D()
    try:
        rclpy.spin(node)
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()