ROS # 10 - 패키지 설계

남생이·2024년 10월 22일

ROS

목록 보기
10/28

1. Design Package

  • ROS2의 토픽, 서비스, 액션 프로그래밍을 이용해 각 노드들이 서로 연동되어 구동하는 패키지 설계
  • 프로세스를 목적별로 나누어 노드 단위의 프로그램을 작성하고 노드와 노드 간의 데이터 통신을 고려하여 설계

2. Feature to be Implemented

계산기 설계하기

  • 현재 시간과 변수 a,b를 받아 연산하여 결과값 도출
  • 연산 결과값을 누적하여 목표치에 도달 시 결과값을 표시

3. Process

  1. argument node : arithmetic_argument 토픽으로 현재 시간과 변수 a,b를 퍼블리시

  2. calculator node : arithmetic_argumnet 토픽이 생성된 시간과 변수 a와 b를 서브스크라이브

  3. operator node : arithmetic_argumnet 서비스를 통해 calculator 노드에게 연산자(+,-,*,/)를 서비스 요청값으로 전송

  4. calculator node

    • 서브스크라이브하여 저장하고 있는 변수 a,b를 operator 노드로부터 서비스 요청값으로 받은 연산자를 이용하여 연산
    • operator 노드에게 결과값을 arithmetic_operator 이름으로 서비스 응답값 전송
    • checker 노드에게 액션 목표값(action goal)을 받은 후부터 저장된 변수(a,b,연산자)를 가지고 연산한 값을 합함
    • 연산이 끝난 계산식을 arithmetic_checker 이름으로 액션 피드백(action feedback)을 checker 노드로 전송
    • 연산값의 합이 액션 목표값을 넘기면 최종 연산 합계를 arithmetic_checker 이름으로 액션 결과값(action result)를 checker node로 전송
  5. checker node : 연산값의 합계 한계치를 arithmetic_checker 액션 이름으로 액션 목표값으로 전달


4. Edit files

1. pkg create

cd workspace/src
ros2 pkg create --build-type ament_python [pkg_name]
cd -
rosdep install --from-paths src --ignore-src -r- y

2. interface pkg 수정

이전 ROS2 #7 에서 만든 ros_study_msgs 패키지의 msg, action ,srv에 Arithmetic 파일을 각각 생성

  • *아래의 파일들은 변수와 데이터 구조의 형식을 지정하며 구체적인 수치나 값은 publisher, subscriber, node에 해당하는 코드에서 설정한다.

cd workspace/src/ros_study_msgs
mkdir action msg srv

ArithmeticArgument.msg

ROS2에서 사용하는 메시지 타입을 정의하는 파일, 두개의 소수점 인자(a,b)와 메시지 생성 시간을 발행 및 수신하는데 사용됩니다.

# ArithmeticArgument.msg
# messages
builtin_interfaces/Time stamp # 메시지 생성 시점
float32 argument_a
float32 argument_b

AritmeticOperator.srv

ROS2에서 사용하는 Service type을 정의하는 파일, 산술 연산을 위해 요청가능한 연산자와 그 결과를 반환하는 구조를 포함

# ArithmeticOperator.srv
# Constants
int8 PLUS = 1
int8 MINUS = 2
int8 MULTIPLY = 3
int8 DIVISION = 4

# Request: 클라이언트가 서비스에 요청을 보낼 때 포함되는 데이터 구조
int8 arithmetic_operator # 산술 연산자
---
# Response: 클라이언트에 응답할 때 포함되는 데이터 구조 정의
float32 arithmetic_result # 계산 결과 반환을 위한 값

ArithmeticChecker.action

ROS2에서 사용되는 action interface를 정의하는 파일, publisher, subscriber와 상태 피드백을 할 수 있는 기능을 제공

# ArithmeticChecker.action
# goal: 클라이언트가 액션서버에 전달할 목표 데이터
float32 goal_sum # 목표 합계
---
# Result: 클라이언트에 응답할 때 포함되는 데이터 구조
string[] all_formula # 사용된 모든 수식
float32 total_sum # 계산된 총합계
---
# Feedback: 액션 서버가 클라이언트에 전달하는 피드백 데이터
string[] formula # 진행 중인 수식

3. Topic interface

Topic publisher code - argument.py

import random  # create random number library
from ros_study_msgs.msg import ArithmeticArgument # msg type file import
# rcl_interfaces.msg: basic interface msg type of ros2
from rcl_interfaces.msg import SetParametersResult # parameter setting result
import rclpy
from rclpy.node import Node
# define Qos profile
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy

# Argument class: Inherit Node class and define ROS2 Node (name:argument)
class Argument(Node):
    def __init__(self):
        super().__init__('argument')
        
        '''parameters
        - qos_depth: message que depth of QoS setting 
        - min/max_random_num: set the random number min and max value 
        '''
        self.declare_parameter('qos_depth', 10) # declare parameter
        qos_depth = self.get_parameter('qos_depth').value
        
        self.declare_parameter('min_random_num', 1)
        self.min_random_num = self.get_parameter('min_random_num').value
        
        self.declare_parameter('max_random_num', 10)
        self.max_random_num = self.get_parameter('max_random_num').value
        
        ''' Qos profile setting
        - QOS_RKL1OV: Set QOS profile about msg transmission method 
        - RELIABLE: msg reliable
        - KEEP_LAST: keep last msg
        - depth: depth of msg que
        - VOLATILE: disable message persistence
        '''
        QOS_RKL10V=QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability = QoSDurabilityPolicy.VOLATILE)
        
        # self.publisher_name = self.create_publisher(MessageType,'topic_name',qos_profile)
        self.arithmetic_argument_publisher = self.create_publisher(
        ArithmeticArgument,
        'arithmetic_argument',
        QOS_RKL10V)
        
        # create timer and publish msg
        self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)
        
        # Add parameter update callback
        self.add_on_set_parameters_callback(self.update_parameter)
        
        
    # publish random msg
    def publish_random_arithmetic_arguments(self):
        msg=ArithmeticArgument() # create object
        msg.stamp=self.get_clock().now().to_msg() # save time
        msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
        msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
        self.arithmetic_argument_publisher.publish(msg)
        self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
        self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))
        
    # Update parameters dynamically
    def update_parameter(self, params):
        for param in params:
            if param.name == 'min_random_num' and param.type_ == param.Type.INTEGER:
                self.min_random_num = param.value
            elif param.name == 'max_random_num' and param.type_ == param.Type.INTEGER:
                self.max_random_num = param.value
        return SetParametersResult(successful=True)
        
def main(args=None):
    rclpy.init(args=args)
    try:
        argument = Argument()
        try:
            rclpy.spin(argument)
        except KeyboardInterrupt:
            argument.get_logger().info('Keyboard Interupt (SIGINT)')
        finally:
            argument.destroy_node()
    finally:
        rclpy.shutdown()
            
if __name__ =='__main__':
    main()

자세한 코드 분석은 다음 #9에서 진행하도록 하겠다.

profile
공부하는 거북이

0개의 댓글