[ros2] topic subscribe 시, 원하는 시간의 timestamp 데이터를 불러오는 방법

About_work·2024년 4월 28일
0

ros2

목록 보기
38/41

1. 예시 코드

import rclpy
from rclpy.node import Node
from std_msgs.msg import String
from rclpy.time import Time
from typing import List, Tuple, Optional
import numpy as np

class ClosestMessageSubscriber(Node):
    """특정 토픽의 메시지를 수신하여 버퍼에 저장하고, 요청된 타임스탬프에 가장 가까운 메시지를 검색하는 노드.

    Attributes:
        subscription (Subscription): ROS 2 subscription 객체.
        buffer (List[Tuple[Time, String]]): 수신된 메시지와 타임스탬프를 저장하는 버퍼.
        buffer_size (int): 버퍼의 최대 크기.
    """
    def __init__(self):
        """노드 초기화 및 구독자 설정."""
        super().__init__('closest_message_subscriber')
        self.subscription = self.create_subscription(
            String,
            'chatter',
            self.listener_callback,
            10
        )
        self.buffer: List[Tuple[Time, String]] = []
        self.buffer_size: int = 100  # 버퍼 사이즈를 100개 메시지로 설정

    def listener_callback(self, msg: String) -> None:
        """토픽의 메시지를 받아 버퍼에 저장하는 콜백 함수.

        Args:
            msg (String): 수신된 메시지.
        """
        # 버퍼 관리: 오래된 메시지 제거
        if len(self.buffer) >= self.buffer_size:
            self.buffer.pop(0)
        # 메시지 저장
        self.buffer.append((self.get_clock().now(), msg))

    def get_closest_message(self, target_time: Time, time_gap_threshold: float) -> Optional[String]:
        """요청된 타임스탬프와 가장 근접한 메시지를 검색합니다.

        Args:
            target_time (Time): 찾고자 하는 타임스탬프.
            time_gap_threshold (float): 허용되는 최대 시간 차이 (나노초 단위).

        Returns:
            Optional[String]: 가장 근접한 타임스탬프의 메시지. 조건을 만족하는 메시지가 없으면 None을 반환합니다.
        """
        if not self.buffer:
            return None

        # 시간 차이 계산
        timestamps = np.array([t[0].nanoseconds for t in self.buffer])
        target_ns = target_time.nanoseconds
        time_diffs = np.abs(timestamps - target_ns)

        # 조건을 만족하는 최소 시간 차이 찾기
        min_diff_index = np.argmin(time_diffs)
        min_diff = time_diffs[min_diff_index]

        if min_diff <= time_gap_threshold:
            return self.buffer[min_diff_index][1]
        else:
            return None

def main(args=None) -> None:
    """메인 함수: 노드를 초기화하고 실행합니다."""
    rclpy.init(args=args)
    node = ClosestMessageSubscriber()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2. rclcpp/rclpy Time Source 및 Time Synchronization

  • ROS 2에서는 시간 동기화를 위해 Time Source와 같은 메커니즘을 제공합니다.
  • 이를 통해 메시지의 헤더에 있는 타임스탬프를 이용하여 데이터를 동기화할 수 있습니다.
  • 예를 들어, message_filters 라이브러리를 사용하여 여러 토픽의 메시지를 시간에 따라 동기화할 수 있습니다.
  • 하지만 이는 여러 데이터 소스를 동시에 처리할 때 유용하며, 특정 시점의 데이터를 검색하는 직접적인 방법은 제공하지 않습니다.

  • message_filters 라이브러리를 사용하여 ROS 2에서 여러 토픽의 메시지를 시간에 따라 동기화하는 방법을 설명하겠습니다.
  • 이 기능은 특히 여러 센서로부터 데이터를 수집하고 동기화해야 할 때 매우 유용합니다.
  • 예를 들어, 로봇이 카메라와 라이다를 사용할 때, 두 센서의 데이터를 시간적으로 정확하게 매칭시켜야 하는 상황에서 사용할 수 있습니다.

2.1. Python에서 message_filters 사용하기

message_filters를 사용하는 기본적인 단계는 다음과 같습니다:

  1. 필요한 라이브러리 임포트
  2. 메시지 필터 생성
  3. 시간 동기화 설정
  4. 콜백 함수 정의
  5. 필터에 콜백 연결

아래는 두 개의 토픽 (/camera/image/lidar/scan)에서 메시지를 동기화하는 예제 코드입니다.

import rclpy
from rclpy.node import Node
from message_filters import ApproximateTimeSynchronizer, Subscriber
from sensor_msgs.msg import Image
from sensor_msgs.msg import LaserScan

class SensorFusionNode(Node):
    def __init__(self):
        super().__init__('sensor_fusion_node')
        # 메시지 필터 생성
        self.image_sub = Subscriber(self, Image, '/camera/image')
        self.lidar_sub = Subscriber(self, LaserScan, '/lidar/scan')

        # 시간 동기화 설정
        self.ats = ApproximateTimeSynchronizer([self.image_sub, self.lidar_sub], queue_size=10, slop=0.1)
        self.ats.registerCallback(self.callback)

    def callback(self, image, scan):
        # 동기화된 메시지 처리
        self.get_logger().info(f"Received synchronized messages: {image.header.stamp} {scan.header.stamp}")

def main(args=None):
    rclpy.init(args=args)
    node = SensorFusionNode()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

코드 설명

  • Subscriber 객체 생성: 각 토픽에 대한 Subscriber 객체를 생성합니다.
  • ApproximateTimeSynchronizer 설정:
    • ApproximateTimeSynchronizer를 사용하여 두 토픽의 메시지를 동기화
    • queue_size는 동기화를 위해 버퍼링할 메시지의 수를 정의하고,
    • slop는 메시지 간의 최대 허용 시간 차이를 초 단위로 설정
  • 콜백 함수 연결:
    • 동기화된 메시지가 도착할 때마다 실행될 콜백 함수를 정의하고 등록
  • message_filtersApproximateTimeSynchronizer 외에도 TimeSynchronizer (엄격한 동기화가 필요할 때) 등 다양한 동기화 옵션을 제공합니다.
  • 이러한 도구를 사용하면 센서 데이터의 통합 및 분석이 훨씬 용이해집니다.

3. TF2 Buffer와 Listener 사용

  • tf2_ros 패키지는 변환 데이터를 버퍼링하고, 이 데이터에 대한 조회를 허용
  • TF2를 사용하면 특정 시간에 대한 변환 데이터를 요청할 수 있음
  • 이 방법은 주로 좌표 변환에 사용되지만, 개념적으로 메시지 데이터에도 유사한 버퍼링 및 조회 메커니즘을 적용할 수 있음
  • tf2_ros를 사용하여 ROS 2에서 특정 시간에 대한 변환 데이터를 요청하는 방법을 설명
  • 이를 통해 로봇의 서로 다른 센서와 액추에이터 간의 상대적 위치를 특정 시간에 맞춰 계산할 수 있음

3.1. 기본 개념

  • tf2_ros는 좌표 변환을 위한 시스템
  • BufferTransformListener를 사용하여 변환을 저장하고, 이러한 변환에 접근하여 필요한 계산을 수행할 수 있음

3.2. 코드 구현

  1. 노드 생성 및 tf2_ros.Buffer 초기화
  • 먼저, ROS 2 노드를 생성하고 tf2_ros.Buffer 객체를 초기화합니다. Buffer 객체는 변환 데이터를 저장
  1. tf2_ros.TransformListener 사용

    • TransformListenerBuffer에 데이터를 자동으로 저장
    • 이를 통해 실시간으로 변환 데이터를 수신하고 저장
  2. 변환 요청

    • 특정 시간에 대한 변환을 요청하려면, Buffer 객체의 lookup_transform 메소드를 사용
    • 이 메소드는 대상 프레임, 소스 프레임, 그리고 조회하고자 하는 시간을 인자로 받음

Python 예제 코드

  • 아래는 두 프레임 간의 변환을 특정 시간에 조회하는 ROS 2 Python 노드의 예제입니다.
import rclpy
from rclpy.node import Node
from tf2_ros import Buffer, TransformListener
from tf2_ros import LookupException, ConnectivityException, ExtrapolationException
from geometry_msgs.msg import TransformStamped

class TransformListenerNode(Node):
    def __init__(self):
        super().__init__('transform_listener_node')
        self.tf_buffer = Buffer()
        self.listener = TransformListener(self.tf_buffer, self)

    def get_transform(self, target_frame: str, source_frame: str, time):
        try:
            # 시간을 Time 형식으로 변환 (rclpy.time.Time)
            when = rclpy.time.Time(seconds=time)
            # 특정 시간에 대한 변환 요청
            transform = self.tf_buffer.lookup_transform(target_frame, source_frame, when)
            return transform
        except (LookupException, ConnectivityException, ExtrapolationException) as e:
            self.get_logger().error(f'Error getting transform: {str(e)}')
            return None

def main(args=None):
    rclpy.init(args=args)
    node = TransformListenerNode()
    try:
        # 예를 들어, 'base_link'에서 'camera_link'로의 변환을 요청
        transform = node.get_transform('base_link', 'camera_link', 1620303662.0)
        if transform:
            node.get_logger().info(f'Transform: {transform}')
    finally:
        node.destroy_node()
        rclpy.shutdown()

if __name__ == '__main__':
    main()

주의사항

  • 요청하는 변환의 시간은 로봇의 시간과 동기화되어 있어야 하며, Buffer에 저장된 시간 범위 내에 있어야 합니다.
  • 예외 처리는 변환을 찾을 수 없거나 데이터가 충분하지 않을 때 발생할 수 있는 오류를 처리합니다.
profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글