[ros2] topic pub / sub

About_work·2023년 11월 15일
0

ros2

목록 보기
8/41

1. 간단 개념 정리

1.1. rclpy.init(args=args)

  • ROS 2 노드를 초기화하는 데 사용
  • 이 함수는 ROS 2 프로그램이 시작될 때 최초 한 번 호출되어야 함
  • args 매개변수는 명령줄 인자를 ROS 2 노드에 전달하는 데 사용되며, 대부분의 경우 sys.argv를 통해 전달
  • 이 초기화 과정은 ROS 2 런타임과 통신을 시작하기 전에, 필수적인 설정과 리소스 할당을 수행

1.2. rclpy.spin(node)

  • 지정된 노드를 실행하고, ROS 2 시스템으로부터 메시지를 수신 대기
  • 이 함수는 노드가 콜백을 지속적으로 호출할 수 있도록, 무한 루프 내에서 실행
  • spin 함수는 프로그램이 종료될 때까지 블로킹
  • 새로운 데이터가 도착하거나 이벤트가 발생할 때마다 -> 해당 노드의 콜백 함수를 호출하여 처리

1.3. destroy_node() / rclpy.shutdown() 란?

  • .destroy_node()
    • 노드와 관련된 리소스를 명시적으로 정리
    • 노드를 파괴
    • 노드가 더 이상 필요하지 않을 때 호출
  • rclpy.shutdown()
    • 이 함수는 rclpy 라이브러리의 사용을 종료하고 ROS 2 시스템과의 통신을 정리
    • 프로그램 종료 시 한 번 호출되어야 하며, 모든 노드의 실행이 완료된 후에 실행되어야 함

1.4. from rclpy.node import Node 로 생성된 인스턴스는 독립된 프로세스 안에서 동작해? -> 아니오.

  • Node 인스턴스 자체는 독립된 프로세스로 실행되지 않습니다.
  • Python 스크립트 내에서 생성되며, 이 스크립트를 실행하는 Python 인터프리터 프로세스 내에서 동작
  • ROS 2에서 노드는 추상화된 실행 단위
  • 실제 프로세스 또는 스레드 경계와는 독립적
  • 그 외 정보
    • 다른 스크립트를, 각각의 터미널이 따로 실시하면, 그들은 다른 프로세스에서 구동됨.

2. create_publisher / create_subscription 메서드의 파라미터

2.1. create_publisher 메서드

  1. msg_type: 발행할 메시지의 타입을 지정
  • 이는 메시지 정의에서 사용된 클래스 타입이어야 합니다.
  • 예를 들어, std_msgs.msg.String은 표준 문자열 메시지 타입
  1. topic: 메시지가 발행될 토픽의 이름
  • 구독자는 이 토픽 이름을 사용하여 메시지를 수신
  1. qos_profile: 메시지의 품질 보증(Quality of Service, QoS) 설정을 지정
  • ROS 2는 다양한 네트워크 조건과 통신 요구 사항을 충족시키기 위해 여러 QoS 정책을 제공
  • 예를 들어, 신뢰할 수 있는 통신이 필요한 경우, 메시지의 저장 기간을 설정하거나, 데이터의 신뢰성을 보장하기 위한 정책을 설정
  • qos_profilerclpy.qos.QoSProfile 객체를 사용하여 정의됨
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
from multiprocessing import Process
import sys

class ImageProcessor(Node):
    def __init__(self):
        super().__init__('image_processor')
        self.subscription = self.create_subscription(
            Image,
            'raw_image',
            self.listener_callback,
            10)
        self.publisher = self.create_publisher(
            Image,
            'processed_image',
            10)
        self.bridge = CvBridge()
        self.timer = self.create_timer(0.5, self.timer_callback)  # 0.5초마다 호출
        self.last_image = None

    def listener_callback(self, msg):
        try:
            cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
            self.last_image = cv_image
        except Exception as e:
            self.get_logger().error('Failed to convert image: %r' % (e,))

    def timer_callback(self):
        if self.last_image is not None:
            processed_image = cv2.cvtColor(self.last_image, cv2.COLOR_BGR2GRAY)
            processed_image = cv2.cvtColor(processed_image, cv2.COLOR_GRAY2BGR)
            processed_image[:] = 0
            try:
                ros_image = self.bridge.cv2_to_imgmsg(processed_image, "bgr8")
                self.publisher.publish(ros_image)
                self.get_logger().info('Processed image published')
            except Exception as e:
                self.get_logger().error('Failed to convert image: %r' % (e,))

def run_node():
    rclpy.init()
    image_processor = ImageProcessor()
    rclpy.spin(image_processor)
    image_processor.destroy_node()
    rclpy.shutdown()

def main():
    if len(sys.argv) > 1 and sys.argv[1] == 'process':
        # 별도 프로세스에서 노드 실행
        process = Process(target=run_node)
        process.start()
        process.join()
    else:
        # 현재 프로세스에서 노드 실행
        run_node()

if __name__ == '__main__':
    main()

2.2. create_subscription 메서드

  1. msg_type: 구독할 메시지의 타입을 지정
  • 이는 메시지 정의에서 사용된 클래스 타입
  • 예를 들어, std_msgs.msg.String은 표준 문자열 메시지 타입
  1. topic: 구독할 토픽의 이름

  2. callback: 토픽에 대한 메시지를 수신할 때마다 호출될 콜백 함수

  • 이 함수는 수신된 메시지를 인자로 받아 처리합니다.
  1. qos_profile: 메시지의 품질 보증(QoS) 설정을 지정
  • rclpy.qos.QoSProfile 객체를 사용하여 정의
  1. callback_group (선택적):
  • 콜백을 실행할 콜백 그룹을 지정
  • 이는 고급 사용자를 위한 옵션으로, 특정 콜백 그룹에 콜백을 할당하여 실행 방식을 제어할 수 있음.
  1. event_callbacks (선택적):
  • 구독 이벤트에 대한 콜백을 설정하는 데 사용
  • 이는 구독의 수명 주기 동안 발생할 수 있는 다양한 이벤트(예: 메시지 수신, 구독 매칭 등)에 대한 사용자 정의 콜백을 지정할 수 있게 해줌.
import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class MySubscriber(Node):
    def __init__(self):
        super().__init__('my_subscriber')
        self.subscription = self.create_subscription(
            String,
            'topic',
            self.listener_callback,
            10)
        self.subscription  # prevent unused variable warning

    def listener_callback(self, msg):
        self.get_logger().info('I heard: "%s"' % msg.data)

def main(args=None):
    rclpy.init(args=args)
    my_subscriber = MySubscriber()
    rclpy.spin(my_subscriber)
    my_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

3. qos_profile 파라미터

  • create_publisher 함수의 인자 중 queue_size는 ROS 1에서 사용되었으나, ROS 2에서는 qos_profile로 대체되었습니다.
  • qos_profile 인자를 사용하여 메시지의 메시지의 신뢰성, 내구성, 전달 속도 등과 관련된 다양한 속성이 포함
  • 개요
    • 사용자는 이러한 표준 프로파일을 그대로 사용하거나, 필요에 따라 개별 QoS 설정을 조정하여 커스텀 프로파일을 생성할 수도 있습니다.
    • 특정 애플리케이션의 요구사항에 맞게 통신을 최적화할 수 있습니다.
      • 예를 들어, 실시간으로 중요한 데이터를 다루는 시스템에서는 RELIABLETRANSIENT_LOCAL 설정이 중요
      • 빠른 성능이 필요한 시스템에서는 BEST_EFFORTVOLATILE 설정이 더 적합

ROS 1

  • 큐 크기 (queue_size):
    • 퍼블리셔의 메시지 큐의 크기를 정의
      • 큐 크기는 퍼블리셔가 네트워크를 통해 메시지를 전송하기 전에, 보유할 수 있는 메시지의 최대 개수를 결정
    • 예를 들어, queue_size=10은 최대 10개의 메시지를 큐에 보관할 수 있다는 것을 의미

ROS 2: qos_profile

기본 QoS 프로파일: from rclpy.qos import QoSPresetProfiles

import rclpy
from rclpy.qos import QoSPresetProfiles
from std_msgs.msg import String


def main():
    rclpy.init()

    node = rclpy.create_node('qos_example_node')

    # SYSTEM_DEFAULT QoS 프로파일 사용
    qos_profile_system_default = QoSPresetProfiles.SYSTEM_DEFAULT

    # SENSOR_DATA QoS 프로파일 사용
    qos_profile_sensor_data = QoSPresetProfiles.SENSOR_DATA

    # 퍼블리셔 생성
    publisher_system_default = node.create_publisher(
        String, 'system_default_topic', qos_profile_system_default)
    publisher_sensor_data = node.create_publisher(String, 'sensor_data_topic',
                                                  qos_profile_sensor_data)

    # 메시지 생성
    msg = String()
    msg.data = 'Hello, World!'

    # 일부 코드에서는 여기서 메시지를 퍼블리시하여 사용할 수 있습니다.

    # 노드 종료
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()
  1. System Default (시스템 기본값):

    • 시스템이나 미들웨어에 의해 정의된 기본 QoS 설정을 사용
    • qos_profile=rclpy.qos.QoSPresetProfiles.SYSTEM_DEFAULT
  2. Sensor Data (센서 데이터):

    • 높은 데이터 속도가 필요한 센서 데이터 전송에 적합
    • 낮은 신뢰성과 함께 데이터의 신속한 전달을 중시
    • 늦게 도착하는 메시지는 무시될 수 있으며, 항상 최신 데이터를 전송하는 것이 중요
    • qos_profile=rclpy.qos.QoSPresetProfiles.SENSOR_DATA
  3. Parameters (파라미터):

    • 파라미터 이벤트와 같이 신뢰성이 중요한 경우에 사용
    • 높은 신뢰성과, 보다 느린 전송 속도
    • qos_profile=rclpy.qos.QoSPresetProfiles.PARAMETERS
  4. Services (서비스):

    • 서비스 요청과 응답에 사용
    • 요청에 대한 응답이 보장되어야 하므로 높은 신뢰성
    • qos_profile=rclpy.qos.QoSPresetProfiles.SERVICES_DEFAULT
  5. Parameter Events (파라미터 이벤트):

    • 파라미터 변경에 대한 알림에 사용
    • 시스템 구성의 변경을 감지하고 반응할 필요가 있을 때 사용.
    • 신속한 전달과 높은 신뢰성
    • qos_profile=rclpy.qos.QoSPresetProfiles.PARAMETER_EVENTS

rclpy.qos.QoSProfile / QoSReliabilityPolicy: QoS 설정 커스터마이징

  • QoS 설정을 커스터마이징할 때는 rclpy.qos.QoSProfile을 사용하여 필요한 옵션을 조정할 수 있습니다. 예를 들어:
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy

custom_qos_profile = QoSProfile(
    reliability=QoSReliabilityPolicy.RELIABLE,
    depth=10
)
  • 이 코드는 신뢰할 수 있는 전송을 위해 RELIABLE 정책을 사용하고, 큐의 크기를 10으로 설정합니다.
  1. 신뢰성(Reliability):
    • 신뢰성 설정은 메시지가 신뢰성 있게 전달되어야 하는지 여부를 결정합니다.
    • RELIABLE은 모든 메시지가 도착할 것으로 예상되며, 네트워크 문제가 발생하면 재전송을 시도
    • BEST_EFFORT는 높은 성능을 위해 신뢰성을 희생할 수 있으며, 메시지 손실이 발생할 수 있음
from rclpy.qos import QoSProfile
from rclpy.qos import ReliabilityPolicy

qos_profile = QoSProfile(
    reliability=ReliabilityPolicy.RELIABLE
)

  1. 내구성(Durability):
    • 내구성은 메시지가 얼마나 오래 저장되어야 하는지 정의
    • TRANSIENT_LOCAL은 메시지가 노드의 생명주기를 넘어서 지속되도록 합니다. 새로운 구독자가 연결될 때 과거의 메시지를 받을 수 있습니다.
    • VOLATILE은 메시지가 오래 보관되지 않고 빠르게 사라질 것임을 의미
from rclpy.qos import DurabilityPolicy

qos_profile = QoSProfile(
    durability=DurabilityPolicy.TRANSIENT_LOCAL
)

  1. 전달 지연(Deadline):
    • 전달 지연은 메시지가 얼마나 자주 발행되어야 하는지를 나타냅니다.
    • 이 설정은 퍼블리셔가 정해진 시간 내에 메시지를 발행하지 않으면 시스템이 경고를 받을 수 있도록 합니다.
from rclpy.qos import DeadlinePolicy

qos_profile = QoSProfile(
    deadline=DeadlinePolicy(rclpy.duration.Duration(seconds=2))
)

  1. 수명(Liveliness):
    • 수명은 퍼블리셔가 여전히 살아있고 활동적인지를 나타냅니다.
    • 이는 퍼블리셔가 정기적으로 신호를 보내 활동을 확인함으로써 구독자가 퍼블리셔의 상태를 파악할 수 있게 합니다.
from rclpy.qos import LivelinessPolicy

qos_profile = QoSProfile(
    liveliness=LivelinessPolicy.AUTOMATIC
)

  1. 수명(Lifespan):
    • 수명은 메시지가 유효한 기간을 정의
    • 이 기간이 지나면 메시지는 더 이상 유효하지 않게 됩니다.
from rclpy.qos import LifespanPolicy

qos_profile = QoSProfile(
    lifespan=LifespanPolicy(rclpy.duration.Duration(seconds=5))
)

  1. 히스토리(History) 및 큐 크기(Queue Size):
    • 히스토리 설정은 얼마나 많은 메시지가 저장될지를 결정
    • KEEP_LAST는 최근 메시지만 저장하며, 큐 크기는 저장할 메시지의 수를 정의합니다.
    • KEEP_ALL은 모든 메시지를 저장하려고 시도합니다.
from rclpy.qos import HistoryPolicy

qos_profile = QoSProfile(
    history=HistoryPolicy.KEEP_LAST,
    depth=10  # 큐 크기와 함께 사용됩니다.
)

profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글