제가 작성하는 이 글은 제가 공부하기 위해 작성하고 제 학교 후배들에게 조금이라도 ros에 대해서 접하게 하고자 작성하게 되었습니다. 그러니 만약 틀린내용이나 그러니 제가 부족한 부분이 있을수 있으니 주의해주면서 봐주시면 감사하겠습니다 -Happy Lee-

마지막 calculator 부분이다. 이 부분 같은경우 이제 argument, operator 그리고 checker 부분에 노드들에 있는 메시지들을 받아 중간의 역할을 하는 것인데 뭔가 Ros1을 비교한다면 roscore를 생각하면 된다.
import time
from study_msg.action import ArithmeticChecker
from study_msg.msg import ArithmeticArgument
from study_msg.srv import ArithmeticOperator
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
class Calculator(Node):
def __init__(self):
super().__init__('calculator')
self.argument_a = 0.0
self.argument_b = 0.0
self.argument_operator = 0
self.argument_result = 0.0
self.argument_formula = ''
self.operator = ['+', '-', '*', '/']
self.callback_group = ReentrantCallbackGroup()
self.declare_parameter('qos_depth', 10)
qos_depth = self.get_parameter('qos_depth').value
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
self.arithmetic_argument_subscriber = self.create_subscription(
ArithmeticArgument,
'arithmetic_argument',
self.get_arithmetic_argument,
QOS_RKL10V,
callback_group=self.callback_group)
self.arithmetic_service_server = self.create_service(
ArithmeticOperator,
'arithmetic_operator',
self.get_arithmetic_operator,
callback_group=self.callback_group)
self.arithmetic_action_server = ActionServer(
self,
ArithmeticChecker,
'arithmetic_checker',
self.execute_checker,
callback_group=self.callback_group)
def get_arithmetic_argument(self, msg):
self.argument_a = msg.argument_a
self.argument_b = msg.argument_b
self.get_logger().info('Timestamp of the message: {0}'.format(msg.stamp))
self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))
def get_arithmetic_operator(self, request, response):
self.argument_operator = request.arithmetic_operator
self.argument_result = self.calculate_given_formula(
self.argument_a,
self.argument_b,
self.argument_operator)
response.arithmetic_result = self.argument_result
self.argument_formula = '{0} {1} {2} = {3}'.format(
self.argument_a,
self.operator[self.argument_operator-1],
self.argument_b,
self.argument_result)
self.get_logger().info(self.argument_formula)
return response
def calculate_given_formula(self, a, b, operator):
if operator == ArithmeticOperator.Request.PLUS:
self.argument_result = a + b
elif operator == ArithmeticOperator.Request.MINUS:
self.argument_result = a - b
elif operator == ArithmeticOperator.Request.MULTIPLY:
self.argument_result = a * b
elif operator == ArithmeticOperator.Request.DIVISION:
try:
self.argument_result = a / b
except ZeroDivisionError:
self.get_logger().error('ZeroDivisionError!')
self.argument_result = 0.0
return self.argument_result
else:
self.get_logger().error(
'Please make sure arithmetic operator(plus, minus, multiply, division).')
self.argument_result = 0.0
return self.argument_result
def execute_checker(self, goal_handle):
self.get_logger().info('Execute arithmetic_checker action!')
feedback_msg = ArithmeticChecker.Feedback()
feedback_msg.formula = []
total_sum = 0.0
goal_sum = goal_handle.request.goal_sum
while total_sum < goal_sum:
total_sum += self.argument_result
feedback_msg.formula.append(self.argument_formula)
self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = ArithmeticChecker.Result()
result.all_formula = feedback_msg.formula
result.total_sum = total_sum
return result
import time
from study_msg.action import ArithmeticChecker
from study_msg.msg import ArithmeticArgument
from study_msg.srv import ArithmeticOperator
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
from study_msg.action import ArithmeticChecker
from study_msg.msg import ArithmeticArgument
from study_msg.srv import ArithmeticOperator
이것들은 내가 만든 메시지에서 가져온다.
from rclpy.action import ActionServer : ROS 2에서 액션 서버를 생성하고 클라이언트 요청을 처리하는 데 사용
from rclpy.callback_groups import ReentrantCallbackGroup : 이게 진짜 중요한데 원래 기본 코드는 단일 스레드에서 작동이 되지만 현재 이것을 사용하게 되면 여러 스레드가 콜백이 동시에 실행이 되는것을 도와준다.
나머지는 안전성 그리고 노드 생성에 관련된 것이다.
def __init__(self):
super().__init__('calculator')
self.argument_a = 0.0
self.argument_b = 0.0
self.argument_operator = 0
self.argument_result = 0.0
self.argument_formula = ''
self.operator = ['+', '-', '*', '/']
self.callback_group = ReentrantCallbackGroup()
self.declare_parameter('qos_depth', 10)
qos_depth = self.get_parameter('qos_depth').value
지금 현재 변수들을 생성하고 산술연산자를 나타내는 정수 값, 계산 결과를 저장하는 변수 그리고 계산의 수식을 문자열로 저장하는 값들을 나열한다.
그리고 operator에 대한 수식값으로 print 결과값을 보여주는 것과 여러 콜백이 가능하도록 설정해준다.
나머지는 qos 즉 통신 품질에 대한 기본 설정이다.
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
이거는 전에 설명했고 추후 qos에 대해서 한번더 다루도록 하겠다.
self.arithmetic_argument_subscriber = self.create_subscription(
ArithmeticArgument,
'arithmetic_argument',
self.get_arithmetic_argument,
QOS_RKL10V,
callback_group=self.callback_group)
self.arithmetic_service_server = self.create_service(
ArithmeticOperator,
'arithmetic_operator',
self.get_arithmetic_operator,
callback_group=self.callback_group)
self.arithmetic_action_server = ActionServer(
self,
ArithmeticChecker,
'arithmetic_checker',
self.execute_checker,
callback_group=self.callback_group)
create_subscription, create_service, ActionServer : 이것들은 각 구독 기능 그리고 서버를 생성합니다.
그리고 각 처리할 메시지 타입 그리고 서버 이름 토픽 이름을 적는데 이거는 각 코드 마다 이름을 동일시 시킵니다.
그리고 이 코드에서 계산하는 것 즉 각 노드에서 가져온 값들을 계산하는 함수들을 사용합니다.
(ex. get_arithmetic_argument, get_arithmetic_operator ...)
그리고 토픽은 QOS로 안전성을 봐주고 콜백 그룹을 지정하여 병렬 실행 여부를 설정한다.
def get_arithmetic_argument(self, msg):
self.argument_a = msg.argument_a
self.argument_b = msg.argument_b
self.get_logger().info('Timestamp of the message: {0}'.format(msg.stamp))
self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))
토픽에서 전달된 메시지를 처리하는 역할로 ros2메시지인 데이터를 추출하고 출력
def get_arithmetic_operator(self, request, response):
self.argument_operator = request.arithmetic_operator
self.argument_result = self.calculate_given_formula(
self.argument_a,
self.argument_b,
self.argument_operator)
response.arithmetic_result = self.argument_result
self.argument_formula = '{0} {1} {2} = {3}'.format(
self.argument_a,
self.operator[self.argument_operator-1],
self.argument_b,
self.argument_result)
self.get_logger().info(self.argument_formula)
return response
self.argument_operator : 서비스에서 request 받은것 즉 1,2,3,4 에따라 +,-를 가져온다고 생각하면된다.
self.argument_a: 첫 번째 피연산자
self.argument_b: 두 번째 피연산자
self.argument_operator: 요청받은 연산자 이렇게 생각하면 된다.
arithmetic_result : 결과값을 저장한다.
그리고 그값에 맞게 위치를 배정한다.
그리고 결과값을 나오게 하고 결과값을 response한다.
def calculate_given_formula(self, a, b, operator):
if operator == ArithmeticOperator.Request.PLUS:
self.argument_result = a + b
elif operator == ArithmeticOperator.Request.MINUS:
self.argument_result = a - b
elif operator == ArithmeticOperator.Request.MULTIPLY:
self.argument_result = a * b
elif operator == ArithmeticOperator.Request.DIVISION:
try:
self.argument_result = a / b
except ZeroDivisionError:
self.get_logger().error('ZeroDivisionError!')
self.argument_result = 0.0
return self.argument_result
else:
self.get_logger().error(
'Please make sure arithmetic operator(plus, minus, multiply, division).')
self.argument_result = 0.0
return self.argument_result
각 연산자에 맞게 계산을 한다.
그것에 맞게 result 값을 나오게 하고
def execute_checker(self, goal_handle):
self.get_logger().info('Execute arithmetic_checker action!')
feedback_msg = ArithmeticChecker.Feedback()
feedback_msg.formula = []
total_sum = 0.0
goal_sum = goal_handle.request.goal_sum
while total_sum < goal_sum:
total_sum += self.argument_result
feedback_msg.formula.append(self.argument_formula)
self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = ArithmeticChecker.Result()
result.all_formula = feedback_msg.formula
result.total_sum = total_sum
return result
액션 서보의 콜백함수이고 그 피트백을 제공학여 최종결과를 반환한다.
먼저 액션이 실행됨을 알리는 것을 시작으로
ArithmeticChecker.Feedback() : 객체를 초기화 하고
feedback_msg.formula : 수식을 저장하는 리스트를 제작하여 수식을 추가하게 한다.
그리고 누적된값을 저장하는 변수와 목표 값을 정한다.
total_sum < goal_sum일 때 계속 값을 누적하며 feedback 리스트에 계속 전송한다.
그리고 액션작업이 성공적으로 이루어진것을 ros에게 알리고 결과 메시지 객체를 초기화하며 결과값을 저장하고 return 해준다.
import rclpy
from rclpy.executors import MultiThreadedExecutor
from py_test.calculator.calculator import Calculator
def main(args=None):
rclpy.init(args=args)
try:
calculator = Calculator()
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(calculator)
try:
executor.spin()
except KeyboardInterrupt:
calculator.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
executor.shutdown()
calculator.arithmetic_action_server.destroy()
calculator.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
나머지는 기존에 사용하던 것이고
MultiThreadedExecutor 이게 진짜 중요한데 이것이 ROS 2에서 콜백을 병렬로 처리할 수 있는 실행기라고 생각하면 된다.
여기서 궁금한게 ReentrantCallbackGroup 일텐데 여기서
ReentrantCallbackGroup는 병렬 실행을 허용하는 콜백 그룹을 설정함으로써 단독으로는 병렬 실행이 불가능하고 실행기가 이를 병렬로 처리하고 MultiThreadedExecutor는 병렬 처리를 실제로 실행하는 환경을 제공함으로써 ReentrantCallbackGroup 기반으로 병렬작업을 수행한다.
rclpy.init(args=args) : 똑같이 클라이언트를 초기화하며 실행환경을 만들어준다.
그리고 노드를 생성하고 MultiThreadedExecutor(num_threads=4) 최대 4개의 스레드를 사용하도록 한다.
executor.add_node(calculator) : 노드를 실행하기위해 추가하고 콜백과 비동기 작업이 시행되도록한다.
executor즉 4개의 스레드를 spin하게 하여 돌게한다.
그 이후는 똑같다.
rclpy.init(args=args) 여기서 args=args 이게 명령줄을 사용하기 위해서 이게 사용된다.
죠와용