ROS2 package - Argument (3)

이준혁·2024년 12월 20일

ROS2

목록 보기
7/14

제가 작성하는 이 글은 제가 공부하기 위해 작성하고 제 학교 후배들에게 조금이라도 ros에 대해서 접하게 하고자 작성하게 되었습니다. 그러니 만약 틀린내용이나 그러니 제가 부족한 부분이 있을수 있으니 주의해주면서 봐주시면 감사하겠습니다 -Happy Lee-


각 노드들간에 코드 작성


이제 각 노드들간 코드 분석을 해볼것이다.

3. Argument code

import random

from study_msg.msg import ArithmeticArgument
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy


class Argument(Node):

    def __init__(self):
        super().__init__('argument')
        self.declare_parameter('qos_depth', 10)
        qos_depth = self.get_parameter('qos_depth').value
        self.declare_parameter('min_random_num', 0)
        self.min_random_num = self.get_parameter('min_random_num').value
        self.declare_parameter('max_random_num', 9)
        self.max_random_num = self.get_parameter('max_random_num').value
        self.add_on_set_parameters_callback(self.update_parameter)

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

        self.arithmetic_argument_publisher = self.create_publisher(
            ArithmeticArgument,
            'arithmetic_argument',
            QOS_RKL10V)

        self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)

    def publish_random_arithmetic_arguments(self):
        msg = ArithmeticArgument()
        msg.stamp = self.get_clock().now().to_msg()
        msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
        msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
        self.arithmetic_argument_publisher.publish(msg)
        self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
        self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))

    def update_parameter(self, params):
        for param in params:
            if param.name == 'min_random_num' and param.type_ == Parameter.Type.INTEGER:
                self.min_random_num = param.value
            elif param.name == 'max_random_num' and param.type_ == Parameter.Type.INTEGER:
                self.max_random_num = param.value
        return SetParametersResult(successful=True)


def main(args=None):
    rclpy.init(args=args)
    try:
        argument = Argument()
        try:
            rclpy.spin(argument)
        except KeyboardInterrupt:
            argument.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            argument.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()

3.1 Argument 라이브러리

import random

from study_msg.msg import ArithmeticArgument
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy

import random : random 패키지 활용

from study_msg.msg import Arithmeimport : 거기에 메시지 타임으로 사용

from rcl_interfaces.msg import SetParametersResult : 파라미터 변경 요청이 성공적으로 이루어졌는지 여부를 나타내는 결과 객체입니다.

rclpy : ROS 2 애플리케이션을 실행하고 노드를 관리하는 핵심 라이브러리입니다.

Parameter : ROS 2 노드의 파라미터를 관리하는 클래스입니다.

QoSProfile : QoS 설정의 전체 프로파일을 정의합니다.
QoSReliabilityPolicy : 메시지를 신뢰성 있게 전달합니다.
QoSHistoryPolicy : 가장 최근 메시지를 저장합니다.
QoSDurabilityPolicy : 메시지가 휘발성임을 의미합니다.

3.2 Argument init

    def __init__(self):
        super().__init__('argument')
        self.declare_parameter('qos_depth', 10)
        qos_depth = self.get_parameter('qos_depth').value
        self.declare_parameter('min_random_num', 0)
        self.min_random_num = self.get_parameter('min_random_num').value
        self.declare_parameter('max_random_num', 9)
        self.max_random_num = self.get_parameter('max_random_num').value
        self.add_on_set_parameters_callback(self.update_parameter)

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

        self.arithmetic_argument_publisher = self.create_publisher(
            ArithmeticArgument,
            'arithmetic_argument',
            QOS_RKL10V)

        self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)

super().init('argument') : argument를 노드 이름을 설정한다.
self.declare_parameter('qos_depth', 10) : Qos 자체가 ros2의 통신의 품질을 보장하는 설정으로 버퍼에 저장되는 메시지의 수를 말함 그래서 depth가 크면 손실은 줄어들지만 메모리는 증가하고 작으면 손실 가능성은 커지지만 메모리 사용량은 줄어듬
qos_depth=self.get_parameter('qos_depth').value : qos의 value 값을저장 즉 10을 저장하는 것

그래서 여기서 보면

        self.declare_parameter('min_random_num', 0)
        self.min_random_num = self.get_parameter('min_random_num').value
        self.declare_parameter('max_random_num', 9)
        self.max_random_num = self.get_parameter('max_random_num').value

이것들은 다 값을 설정해주는 것이다.

        self.add_on_set_parameters_callback(self.update_parameter)

add_on_set_parameters_callback : 파라미터가 동적으로 변경될 때 콜백 함수를 실행하는 기능함

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

reliability : RELIABLE – 데이터가 수신 측에 확실히 도착하도록 보장함
history : KEEP_LAST – 최근 depth 개의 메시지만 저장
depth : qos_depth로 지정된 값
durability : VOLATILE – 구독자가 연결되었을 때만 메시지를 수신

근데 개인적으로 QoSProfile이라는 것 자체가 너무 심오하다. 이게 통신의 품질을 설정하고 데이터의 신뢰성과 전송 특성을 조절하는데 사용되는 요소로 신뢰성, 이전 데이터 저장, 버퍼 크기, 내구성에 대한 것을 만지게 된다.

        self.arithmetic_argument_publisher = self.create_publisher(
            ArithmeticArgument,
            'arithmetic_argument',
            QOS_RKL10V)

여기서 중요하다. self.create_publisher자체가 퍼블리셔를 생성하는 것으로 메시지 타임은ArithmeticArgument고 arithmetic_argument라는 토픽에 메시지를 퍼블리시 합니다. 그리고 Qos는 상단에 내가 만든 Qos를 활용해 사용하게 된다.

        self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)

1초 마다 publish_random_arithmetic_arguments라는 콜백 함수를 호출하는 타이머를 생성한다.


 def publish_random_arithmetic_arguments(self):
        msg = ArithmeticArgument()
        msg.stamp = self.get_clock().now().to_msg()
        msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
        msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
        self.arithmetic_argument_publisher.publish(msg)
        self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
        self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))

self.get_clock().now().to_msg() : 현재 노드의 시간을 ROS 2 메시지 형식으로 변환
self.arithmetic_argument_publisher.publish(msg) : 메시지를 arithmetic_argument 토픽에 퍼블리시
random.randint(a,b) : a가 최소값이고 b가 최대값이 되는거임
self.get_logger().info : 이거는 printf 라고 생각

    def update_parameter(self, params):
        for param in params:
            if param.name == 'min_random_num' and param.type_ == Parameter.Type.INTEGER:
                self.min_random_num = param.value
            elif param.name == 'max_random_num' and param.type_ == Parameter.Type.INTEGER:
                self.max_random_num = param.value
        return SetParametersResult(successful=True)

params에서 각각의 param을 가지고 오면서
param.name == 'minrandom_num' and param.type == Parameter.Type.INTEGER:
이거는 파라미터 이름과 타입이 정수인지 구별하는것이다.
그리고 돌다가 마지막

        return SetParametersResult(successful=True)

여기는 파라미터 변경 성공적인 부분이 return 하게된다.

def main(args=None):
    rclpy.init(args=args)
    try:
        argument = Argument()
        try:
            rclpy.spin(argument)
        except KeyboardInterrupt:
            argument.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            argument.destroy_node()
    finally:
        rclpy.shutdown()

rclpy.init(args=args) : ros2에 대해 클라이언트 라이브러리르 초기화 하는것으로 ros2 노드를 실행하기전 바로 호출
argument = Argument() : 클래스의 인스턴스를 생성하여
rclpy.spin(argument) : 이것을 계속 spin을 하는것이다.무한대기를 계속하는것으로 콜백이 발생할때 까지 반복한다.

profile
#자기공부 #틀린것도많음 #자기개발 여러분 인생이 힘들다 하더라도 그것을 깨는 순간 큰 희열감으로 옵니다~

0개의 댓글