[ROS2] PointCloud2 to pcd

OpenJR·2025년 1월 16일

ROS2는 pcl_ros에서 bag_to_pcd를 지원안한다...
그래서 직접 짜야한다...그래서 짯다..

import sys
import rosbag2_py
import open3d as o3d

from pathlib import Path
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

from sensor_msgs_py.point_cloud2 import read_points


def get_rosbag_options(path, serialization_format='cdr'):
    storage_options = rosbag2_py.StorageOptions(uri=path, storage_id='sqlite3')

    converter_options = rosbag2_py.ConverterOptions(
        input_serialization_format=serialization_format,
        output_serialization_format=serialization_format)

    return storage_options, converter_options


if len(sys.argv) < 3:
    print("Usage: python3 bag_to_pcd.py <bagfile> <pc_topic>")
    sys.exit(1)

bag_path = Path(sys.argv[1])
pcd_dir = bag_path.parent / "pcd"
if not pcd_dir.exists():
    pcd_dir.mkdir()
print(f"Converting {bag_path.absolute()} to pcd files...")
pc_topic = sys.argv[2]

reader = rosbag2_py.SequentialReader()
storage_options, converter_options = get_rosbag_options(str(bag_path))
reader.open(storage_options, converter_options)

topic_types = reader.get_all_topics_and_types()
type_map = {topic_types[i].name: topic_types[i].type for i in range(len(topic_types))}
fields = ('x', 'y', 'z', 'intensity', 'ring', 'time')

total_pcd = o3d.geometry.PointCloud()
while reader.has_next():
    (topic, data, t) = reader.read_next()
    if topic != pc_topic:
        continue
    points = []
    
    msg_type = get_message(type_map[topic])
    msg = deserialize_message(data, msg_type)
    ros_points = read_points(msg, field_names=fields, skip_nans=True)
    for p in ros_points:
        points.append([p[0], p[1], p[2]])
    
    pcd = o3d.geometry.PointCloud()
    pcd.points = o3d.utility.Vector3dVector(points)
    o3d.io.write_point_cloud(f"{pcd_dir}/{t}.pcd", pcd)
    total_pcd += pcd
    print(f"Saved {pcd_dir}/{t}.pcd")
o3d.io.write_point_cloud(f"total.pcd", total_pcd)
    

import rosbag2_py에서 에러가 나는 사람이 있을 수도 있는데 그건 foxy거나.. 더 이전..일 가능성이 있다.
https://github.com/ros2/rosbag2 에서 확실하게 ROS버전을 확인한 후 rosbag2_py가 포함 하는지 확인해야 한다. 없으면 직접 빌드해서 사용하면 된다.
https://github.com/ros2/rosbag2/tree/foxy 를 보면 rosbag2_py가 없다..

그래서 foxy-future로 설치해야한다.

profile
Jacob

0개의 댓글