제가 작성하는 이 글은 제가 공부하기 위해 작성하고 제 학교 후배들에게 조금이라도 ros에 대해서 접하게 하고자 작성하게 되었습니다. 그러니 만약 틀린내용이나 그러니 제가 부족한 부분이 있을수 있으니 주의해주면서 봐주시면 감사하겠습니다 -Happy Lee-

이제 각 노드들간 코드 분석을 해볼것이다.
import random
from study_msg.msg import ArithmeticArgument
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
class Argument(Node):
def __init__(self):
super().__init__('argument')
self.declare_parameter('qos_depth', 10)
qos_depth = self.get_parameter('qos_depth').value
self.declare_parameter('min_random_num', 0)
self.min_random_num = self.get_parameter('min_random_num').value
self.declare_parameter('max_random_num', 9)
self.max_random_num = self.get_parameter('max_random_num').value
self.add_on_set_parameters_callback(self.update_parameter)
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
self.arithmetic_argument_publisher = self.create_publisher(
ArithmeticArgument,
'arithmetic_argument',
QOS_RKL10V)
self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)
def publish_random_arithmetic_arguments(self):
msg = ArithmeticArgument()
msg.stamp = self.get_clock().now().to_msg()
msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
self.arithmetic_argument_publisher.publish(msg)
self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))
def update_parameter(self, params):
for param in params:
if param.name == 'min_random_num' and param.type_ == Parameter.Type.INTEGER:
self.min_random_num = param.value
elif param.name == 'max_random_num' and param.type_ == Parameter.Type.INTEGER:
self.max_random_num = param.value
return SetParametersResult(successful=True)
def main(args=None):
rclpy.init(args=args)
try:
argument = Argument()
try:
rclpy.spin(argument)
except KeyboardInterrupt:
argument.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
argument.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
import random
from study_msg.msg import ArithmeticArgument
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
import random : random 패키지 활용
from study_msg.msg import Arithmeimport : 거기에 메시지 타임으로 사용
from rcl_interfaces.msg import SetParametersResult : 파라미터 변경 요청이 성공적으로 이루어졌는지 여부를 나타내는 결과 객체입니다.
rclpy : ROS 2 애플리케이션을 실행하고 노드를 관리하는 핵심 라이브러리입니다.
Parameter : ROS 2 노드의 파라미터를 관리하는 클래스입니다.
QoSProfile : QoS 설정의 전체 프로파일을 정의합니다.
QoSReliabilityPolicy : 메시지를 신뢰성 있게 전달합니다.
QoSHistoryPolicy : 가장 최근 메시지를 저장합니다.
QoSDurabilityPolicy : 메시지가 휘발성임을 의미합니다.
def __init__(self):
super().__init__('argument')
self.declare_parameter('qos_depth', 10)
qos_depth = self.get_parameter('qos_depth').value
self.declare_parameter('min_random_num', 0)
self.min_random_num = self.get_parameter('min_random_num').value
self.declare_parameter('max_random_num', 9)
self.max_random_num = self.get_parameter('max_random_num').value
self.add_on_set_parameters_callback(self.update_parameter)
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
self.arithmetic_argument_publisher = self.create_publisher(
ArithmeticArgument,
'arithmetic_argument',
QOS_RKL10V)
self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)
super().init('argument') : argument를 노드 이름을 설정한다.
self.declare_parameter('qos_depth', 10) : Qos 자체가 ros2의 통신의 품질을 보장하는 설정으로 버퍼에 저장되는 메시지의 수를 말함 그래서 depth가 크면 손실은 줄어들지만 메모리는 증가하고 작으면 손실 가능성은 커지지만 메모리 사용량은 줄어듬
qos_depth=self.get_parameter('qos_depth').value : qos의 value 값을저장 즉 10을 저장하는 것
그래서 여기서 보면
self.declare_parameter('min_random_num', 0)
self.min_random_num = self.get_parameter('min_random_num').value
self.declare_parameter('max_random_num', 9)
self.max_random_num = self.get_parameter('max_random_num').value
이것들은 다 값을 설정해주는 것이다.
self.add_on_set_parameters_callback(self.update_parameter)
add_on_set_parameters_callback : 파라미터가 동적으로 변경될 때 콜백 함수를 실행하는 기능함
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
reliability : RELIABLE – 데이터가 수신 측에 확실히 도착하도록 보장함
history : KEEP_LAST – 최근 depth 개의 메시지만 저장
depth : qos_depth로 지정된 값
durability : VOLATILE – 구독자가 연결되었을 때만 메시지를 수신
근데 개인적으로 QoSProfile이라는 것 자체가 너무 심오하다. 이게 통신의 품질을 설정하고 데이터의 신뢰성과 전송 특성을 조절하는데 사용되는 요소로 신뢰성, 이전 데이터 저장, 버퍼 크기, 내구성에 대한 것을 만지게 된다.
self.arithmetic_argument_publisher = self.create_publisher(
ArithmeticArgument,
'arithmetic_argument',
QOS_RKL10V)
여기서 중요하다. self.create_publisher자체가 퍼블리셔를 생성하는 것으로 메시지 타임은ArithmeticArgument고 arithmetic_argument라는 토픽에 메시지를 퍼블리시 합니다. 그리고 Qos는 상단에 내가 만든 Qos를 활용해 사용하게 된다.
self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)
1초 마다 publish_random_arithmetic_arguments라는 콜백 함수를 호출하는 타이머를 생성한다.
def publish_random_arithmetic_arguments(self):
msg = ArithmeticArgument()
msg.stamp = self.get_clock().now().to_msg()
msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
self.arithmetic_argument_publisher.publish(msg)
self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))
self.get_clock().now().to_msg() : 현재 노드의 시간을 ROS 2 메시지 형식으로 변환
self.arithmetic_argument_publisher.publish(msg) : 메시지를 arithmetic_argument 토픽에 퍼블리시
random.randint(a,b) : a가 최소값이고 b가 최대값이 되는거임
self.get_logger().info : 이거는 printf 라고 생각
def update_parameter(self, params):
for param in params:
if param.name == 'min_random_num' and param.type_ == Parameter.Type.INTEGER:
self.min_random_num = param.value
elif param.name == 'max_random_num' and param.type_ == Parameter.Type.INTEGER:
self.max_random_num = param.value
return SetParametersResult(successful=True)
params에서 각각의 param을 가지고 오면서
param.name == 'minrandom_num' and param.type == Parameter.Type.INTEGER:
이거는 파라미터 이름과 타입이 정수인지 구별하는것이다.
그리고 돌다가 마지막
return SetParametersResult(successful=True)
여기는 파라미터 변경 성공적인 부분이 return 하게된다.
def main(args=None):
rclpy.init(args=args)
try:
argument = Argument()
try:
rclpy.spin(argument)
except KeyboardInterrupt:
argument.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
argument.destroy_node()
finally:
rclpy.shutdown()
rclpy.init(args=args) : ros2에 대해 클라이언트 라이브러리르 초기화 하는것으로 ros2 노드를 실행하기전 바로 호출
argument = Argument() : 클래스의 인스턴스를 생성하여
rclpy.spin(argument) : 이것을 계속 spin을 하는것이다.무한대기를 계속하는것으로 콜백이 발생할때 까지 반복한다.