Open3D를 활용한 Ransac 예시이다.
흰색 점군은 ransac으로 처리된 결과이고, 나머지 포인트는 raw point cloud 데이터이다.
바닥 부분이 사라진 것을 확인할 수 있다.
스크린캐스트 11-08-2025 11:41:50 PM.webm
#!/usr/bin/env python3
import math
import numpy as np
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import PointCloud2, PointField
from sensor_msgs_py import point_cloud2 as pc2 # 공식 파이썬 유틸
from std_msgs.msg import Header
import open3d as o3d
import tf2_ros
from tf2_ros import TransformException
from tf2_sensor_msgs.tf2_sensor_msgs import do_transform_cloud
from rcl_interfaces.msg import ParameterType, ParameterDescriptor
def pointcloud2_to_xyz(msg: PointCloud2) -> np.ndarray:
"""PointCloud2 → (N,3) float32 numpy array (x,y,z). NaN/inf 제외."""
# structured array 방지: tuple -> list of list -> float32 ndarray
try:
pts = [[x, y, z] for (x, y, z) in pc2.read_points(
msg, field_names=("x", "y", "z"), skip_nans=True
)]
except Exception:
# 혹시 일부 드라이버가 namedtuple/np.void를 줄 때 방어
pts = []
for r in pc2.read_points(msg, field_names=("x", "y", "z"), skip_nans=True):
try:
pts.append([r[0], r[1], r[2]])
except Exception:
pts.append([getattr(r, 'x'), getattr(r, 'y'), getattr(r, 'z')])
if not pts:
return np.empty((0, 3), dtype=np.float32)
return np.asarray(pts, dtype=np.float32)
def xyz_to_pointcloud2(xyz: np.ndarray, frame_id: str, stamp=None) -> PointCloud2:
"""(N,3) float32 → PointCloud2"""
header = Header()
header.frame_id = frame_id
if stamp is not None:
header.stamp = stamp
# fields 정의 (x,y,z)
fields = [
PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
]
return pc2.create_cloud(header, fields, xyz.tolist())
class GroundRemoveOpen3D(Node):
def __init__(self):
super().__init__('ground_remove_open3d')
# === 파라미터 ===
self.declare_parameter('input_topic', '/velodyne_points')
self.declare_parameter('output_topic', '/velodyne')
self.declare_parameter('use_tf', True)
self.declare_parameter('target_frame', 'base_link') # TF 변환 목적 프레임
self.declare_parameter('voxel_leaf', 0.05) # m
self.declare_parameter('ransac_dist_thresh', 0.05) # m
self.declare_parameter('ransac_iter', 500)
#self.declare_parameter('z_pass_min', None) # 예: -0.1 (m) 사용 안하려면 None
#self.declare_parameter('z_pass_max', None) # 예: 0.15 (m)
# 변경 (동적 타이핑 허용)
self.declare_parameter('z_pass_min', None,
descriptor=ParameterDescriptor(dynamic_typing=True))
self.declare_parameter('z_pass_max', None,
descriptor=ParameterDescriptor(dynamic_typing=True))
self.input_topic = self.get_parameter('input_topic').get_parameter_value().string_value
self.output_topic = self.get_parameter('output_topic').get_parameter_value().string_value
self.use_tf = self.get_parameter('use_tf').get_parameter_value().bool_value
self.target_frame = self.get_parameter('target_frame').get_parameter_value().string_value
self.voxel_leaf = self.get_parameter('voxel_leaf').get_parameter_value().double_value
self.ransac_dist = self.get_parameter('ransac_dist_thresh').get_parameter_value().double_value
self.ransac_iter = self.get_parameter('ransac_iter').get_parameter_value().integer_value
#zmin_param = self.get_parameter('z_pass_min').get_parameter_value()
#zmax_param = self.get_parameter('z_pass_max').get_parameter_value()
#self.z_pass_min = zmin_param.double_value if zmin_param.type == zmin_param.Type.DOUBLE else None
#self.z_pass_max = zmax_param.double_value if zmax_param.type == zmax_param.Type.DOUBLE else None
# 변경
p_zmin = self.get_parameter('z_pass_min').get_parameter_value()
p_zmax = self.get_parameter('z_pass_max').get_parameter_value()
def _param_to_float_or_none(pv):
if pv.type == ParameterType.PARAMETER_DOUBLE:
return float(pv.double_value)
if pv.type == ParameterType.PARAMETER_INTEGER:
return float(pv.integer_value)
return None # NOT_SET, STRING, BOOL 등은 무시
self.z_pass_min = _param_to_float_or_none(p_zmin)
self.z_pass_max = _param_to_float_or_none(p_zmax)
# TF buffer/listener
self.tf_buffer = tf2_ros.Buffer(cache_time=rclpy.duration.Duration(seconds=10.0))
self.tf_listener = tf2_ros.TransformListener(self.tf_buffer, self)
# Pub/Sub
self.sub = self.create_subscription(PointCloud2, self.input_topic, self.cb_cloud, 10)
self.pub = self.create_publisher(PointCloud2, self.output_topic, 10)
self.get_logger().info(
f"[GroundRemoveOpen3D] Sub: {self.input_topic} → Pub: {self.output_topic} "
f"(use_tf={self.use_tf}, target_frame={self.target_frame})"
)
def cb_cloud(self, msg: PointCloud2):
# 1) (옵션) TF 변환
cloud_msg = msg
out_frame = msg.header.frame_id
if self.use_tf:
try:
transform = self.tf_buffer.lookup_transform(
self.target_frame, # target
msg.header.frame_id, # source
rclpy.time.Time()) # latest
cloud_msg = do_transform_cloud(msg, transform)
out_frame = self.target_frame
except TransformException as ex:
self.get_logger().warn(f"TF transform failed: {ex}")
return
# 2) PointCloud2 → numpy (N,3)
xyz = pointcloud2_to_xyz(cloud_msg)
if xyz.shape[0] == 0:
return
# 4) Open3D 변환
pcd = o3d.geometry.PointCloud()
pcd.points = o3d.utility.Vector3dVector(xyz.astype(np.float64))
# 5) Voxel downsample
if self.voxel_leaf > 0.0:
pcd = pcd.voxel_down_sample(voxel_size=self.voxel_leaf)
if np.asarray(pcd.points).shape[0] == 0:
out = xyz_to_pointcloud2(np.empty((0, 3), dtype=np.float32), out_frame, stamp=msg.header.stamp)
self.pub.publish(out)
return
# 6) RANSAC 평면 분할 (ground 추정) → inliers 제거
plane_model, inliers = pcd.segment_plane(distance_threshold=self.ransac_dist,
ransac_n=3,
num_iterations=self.ransac_iter)
# plane: ax + by + cz + d = 0
# self.get_logger().debug(f"Plane coeff: {plane_model}")
if len(inliers) > 0:
pcd_no_ground = pcd.select_by_index(inliers, invert=True)
else:
pcd_no_ground = pcd # 평면 못 찾으면 그대로
out_xyz = np.asarray(pcd_no_ground.points).astype(np.float32)
# 7) republish PointCloud2
out_msg = xyz_to_pointcloud2(out_xyz, out_frame, stamp=msg.header.stamp)
self.pub.publish(out_msg)
def main():
rclpy.init()
node = GroundRemoveOpen3D()
try:
rclpy.spin(node)
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()