
계산기 설계하기
- 현재 시간과 변수 a,b를 받아 연산하여 결과값 도출
- 연산 결과값을 누적하여 목표치에 도달 시 결과값을 표시
argument node : arithmetic_argument 토픽으로 현재 시간과 변수 a,b를 퍼블리시
calculator node : arithmetic_argumnet 토픽이 생성된 시간과 변수 a와 b를 서브스크라이브
operator node : arithmetic_argumnet 서비스를 통해 calculator 노드에게 연산자(+,-,*,/)를 서비스 요청값으로 전송
calculator node
checker node : 연산값의 합계 한계치를 arithmetic_checker 액션 이름으로 액션 목표값으로 전달

cd workspace/src
ros2 pkg create --build-type ament_python [pkg_name]
cd -
rosdep install --from-paths src --ignore-src -r- y
이전 ROS2 #7 에서 만든 ros_study_msgs 패키지의 msg, action ,srv에 Arithmetic 파일을 각각 생성

cd workspace/src/ros_study_msgs
mkdir action msg srv
ROS2에서 사용하는 메시지 타입을 정의하는 파일, 두개의 소수점 인자(a,b)와 메시지 생성 시간을 발행 및 수신하는데 사용됩니다.
# ArithmeticArgument.msg
# messages
builtin_interfaces/Time stamp # 메시지 생성 시점
float32 argument_a
float32 argument_b
ROS2에서 사용하는 Service type을 정의하는 파일, 산술 연산을 위해 요청가능한 연산자와 그 결과를 반환하는 구조를 포함
# ArithmeticOperator.srv
# Constants
int8 PLUS = 1
int8 MINUS = 2
int8 MULTIPLY = 3
int8 DIVISION = 4
# Request: 클라이언트가 서비스에 요청을 보낼 때 포함되는 데이터 구조
int8 arithmetic_operator # 산술 연산자
---
# Response: 클라이언트에 응답할 때 포함되는 데이터 구조 정의
float32 arithmetic_result # 계산 결과 반환을 위한 값
ROS2에서 사용되는 action interface를 정의하는 파일, publisher, subscriber와 상태 피드백을 할 수 있는 기능을 제공
# ArithmeticChecker.action
# goal: 클라이언트가 액션서버에 전달할 목표 데이터
float32 goal_sum # 목표 합계
---
# Result: 클라이언트에 응답할 때 포함되는 데이터 구조
string[] all_formula # 사용된 모든 수식
float32 total_sum # 계산된 총합계
---
# Feedback: 액션 서버가 클라이언트에 전달하는 피드백 데이터
string[] formula # 진행 중인 수식

import random # create random number library
from ros_study_msgs.msg import ArithmeticArgument # msg type file import
# rcl_interfaces.msg: basic interface msg type of ros2
from rcl_interfaces.msg import SetParametersResult # parameter setting result
import rclpy
from rclpy.node import Node
# define Qos profile
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
# Argument class: Inherit Node class and define ROS2 Node (name:argument)
class Argument(Node):
def __init__(self):
super().__init__('argument')
'''parameters
- qos_depth: message que depth of QoS setting
- min/max_random_num: set the random number min and max value
'''
self.declare_parameter('qos_depth', 10) # declare parameter
qos_depth = self.get_parameter('qos_depth').value
self.declare_parameter('min_random_num', 1)
self.min_random_num = self.get_parameter('min_random_num').value
self.declare_parameter('max_random_num', 10)
self.max_random_num = self.get_parameter('max_random_num').value
''' Qos profile setting
- QOS_RKL1OV: Set QOS profile about msg transmission method
- RELIABLE: msg reliable
- KEEP_LAST: keep last msg
- depth: depth of msg que
- VOLATILE: disable message persistence
'''
QOS_RKL10V=QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability = QoSDurabilityPolicy.VOLATILE)
# self.publisher_name = self.create_publisher(MessageType,'topic_name',qos_profile)
self.arithmetic_argument_publisher = self.create_publisher(
ArithmeticArgument,
'arithmetic_argument',
QOS_RKL10V)
# create timer and publish msg
self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)
# Add parameter update callback
self.add_on_set_parameters_callback(self.update_parameter)
# publish random msg
def publish_random_arithmetic_arguments(self):
msg=ArithmeticArgument() # create object
msg.stamp=self.get_clock().now().to_msg() # save time
msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
self.arithmetic_argument_publisher.publish(msg)
self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))
# Update parameters dynamically
def update_parameter(self, params):
for param in params:
if param.name == 'min_random_num' and param.type_ == param.Type.INTEGER:
self.min_random_num = param.value
elif param.name == 'max_random_num' and param.type_ == param.Type.INTEGER:
self.max_random_num = param.value
return SetParametersResult(successful=True)
def main(args=None):
rclpy.init(args=args)
try:
argument = Argument()
try:
rclpy.spin(argument)
except KeyboardInterrupt:
argument.get_logger().info('Keyboard Interupt (SIGINT)')
finally:
argument.destroy_node()
finally:
rclpy.shutdown()
if __name__ =='__main__':
main()
자세한 코드 분석은 다음 #9에서 진행하도록 하겠다.