액션 목표를 지정하는 액션 클라이언트와 목표를 받아 동작을 수행하면서 중간 결과값에 해당하는 액션 피드백, 최종결과에 해당하는 액션 결과를 전송하는 액션 서버에 대해 살펴보고자 한다.

def __init__(self):
super().__init__('checker')
self.arithmetic_action_client = ActionClient(
self,
ArithmeticChecker,
'arithmetic_checker')
ActionClient를 생성하여 ArithmeticChecker 액션 타입을 arithmetic_chcker라는 이름의 액션 클라이언트로 생성ArithmeticChecker는 인터페이스 패키지에 action으로 정의되어 있음✔ 액션 서버 준비
def send_goal_total_sum(self, goal_sum):
wait_count = 1
while not self.arithmetic_action_client.wait_for_server(timeout_sec=0.1):
if wait_count > 3:
self.get_logger().warning('Arithmetic action server is not available.')
return False
wait_count += 1
goal_sum : 목표로 설정할 값, 서버에서 계산할 총합self.arithmetic_action_client.wait_for_server(timeout_sec=0.1) : 액션서버가 준비될 때까지 0.1초 기다렸다가 다시 서버 동작 여부를 파악 -> 3번 이상 확인했는데도 준비되지 않은 경우 액션서버가 사용 불가능하다고 파악✔ 목표 메시지 생성
goal_msg = ArithmeticChecker.Goal()
goal_msg.goal_sum = float(goal_sum)
goal_msg = ArithmeticChecker.Goal() : 액션의 목표 메시지 객체 생성goal_sum을 gola_msg에 float형태로 저장
self.send_goal_future = self.arithmetic_action_client.send_goal_async(
goal_msg,
feedback_callback=self.get_arithmetic_action_feedback)
self.send_goal_future.add_done_callback(self.get_arithmetic_action_goal)
return True
send_goal_async() : goal_msg를 비동기방식으로 전송add_done_callback() : 목표 전송이 완료되고 서버가 수락하거나 거부하였는지 결과 처리 return True : 목표가 정상적으로 전송되었음을 의미def get_arithmetic_action_goal(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warning('Action goal rejected.')
return
self.get_logger().info('Action goal accepted.')
self.action_result_future = goal_handle.get_result_async()
self.action_result_future.add_done_callback(self.get_arithmetic_action_result
goal_handle : 목표가 서버에 수락되었는지 거부되었는지 정보 self.action_result_future.add_done_callback(self.get_arithmetic_action_result : 액션 목표가 전달되었다면 액션 결과를 콜백 함수로 선언def get_arithmetic_action_feedback(self, feedback_msg):
action_feedback = feedback_msg.feedback.formula
self.get_logger().info('Action feedback: {0}'.format(action_feedback))
formula : 서버가 작업 수행 도중 생성되는 연산의 결과를 담은 리스트def get_arithmetic_action_result(self, future):
action_status = future.result().status
action_result = future.result().result
if action_status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info('Action succeeded!')
self.get_logger().info(
'Action result(all formula): {0}'.format(action_result.all_formula))
self.get_logger().info(
'Action result(total sum): {0}'.format(action_result.total_sum))
else:
self.get_logger().warning(
'Action failed with status: {0}'.format(action_status))
from action_msgs.msg import GoalStatus
from ros_study_msgs.action import ArithmeticChecker
from rclpy.action import ActionClient
from rclpy.node import Node
class Checker(Node):
def __init__(self):
super().__init__('checker')
self.arithmetic_action_client = ActionClient(
self,
ArithmeticChecker,
'arithmetic_checker')
def send_goal_total_sum(self, goal_sum):
wait_count = 1
while not self.arithmetic_action_client.wait_for_server(timeout_sec=0.1):
if wait_count > 3:
self.get_logger().warning('Arithmetic action server is not available.')
return False
wait_count += 1
goal_msg = ArithmeticChecker.Goal()
goal_msg.goal_sum = (float)(goal_sum)
self.send_goal_future = self.arithmetic_action_client.send_goal_async(
goal_msg,
feedback_callback=self.get_arithmetic_action_feedback)
self.send_goal_future.add_done_callback(self.get_arithmetic_action_goal)
return True
def get_arithmetic_action_goal(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().warning('Action goal rejected.')
return
self.get_logger().info('Action goal accepted.')
self.action_result_future = goal_handle.get_result_async()
self.action_result_future.add_done_callback(self.get_arithmetic_action_result)
def get_arithmetic_action_feedback(self, feedback_msg):
action_feedback = feedback_msg.feedback.formula
self.get_logger().info('Action feedback: {0}'.format(action_feedback))
def get_arithmetic_action_result(self, future):
action_status = future.result().status
action_result = future.result().result
if action_status == GoalStatus.STATUS_SUCCEEDED:
self.get_logger().info('Action succeeded!')
self.get_logger().info(
'Action result(all formula): {0}'.format(action_result.all_formula))
self.get_logger().info(
'Action result(total sum): {0}'.format(action_result.total_sum))
else:
self.get_logger().warning(
'Action failed with status: {0}'.format(action_status))
self.arithmetic_action_server = ActionServer(
self,
ArithmeticChecker,
'arithmetic_checker',
self.execute_checker,
callback_group=self.callback_group)
ArithmeticChecker 액션 타입을 arithmetic_checker 라는 이름으로 액션 서버 생성실제 액션 목표를 받은 후 실행되기 때문에 액션 서버에서 중요한 부분이다.
def execute_checker():
self.get_logger().info('Execute arithmetic_ckecker action!')
feedback_msg = ArithmeticChecker.Feedback()
feedback_msg.formula = [] #연산 과정이 담긴 리스트 초기화
total_sum = 0.0 # 누적 합계 초기화
goal_sum = goal_handle.request.goal_sum
feedback_msg = ArithmeticChecker.Feedback() : 피드백 메시지 객체를 성생하여 클라이언트에게 피드백 전송goal_handle.requsest.goal_sum : 클라이언트로부터 목표 값goal_sum을 가져와 총합이 goal_sum에 도달할때까지 연산 수행while total sum > goal_sum:
total_sum += self.argument_result
feedback_msg.formula.append(self.argument_formula)
self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula)))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
total_sum 이 goal_sum 에 도달할때까지 연산 반복goal_handle.publish_feedback(feedback_msg) : 피드백 메시지를 클라이언트로 전송하여 실시간으로 진행상황 확인time.sleep(1) : 1초 동안 대기 후 피드백 전송✔ 목표 완료 및 결과 반환
goal_handle.succeed()
result = ArithmeticChecker.Result()
result.all_formula = feedback_msg.formula
result.total_sum = total_sum # 최종 누적 합계
return result
result = ArithmeticChecker.Result() : 최정 결과 메시지 객체 생성return result : 최종 결과를 반환하여 서비스 클라이언트에게 전달✔ parser : 명령줄에 전달되는 인자를 정의하고 파싱(데이터를 유의미한 데이터 구조로 변환)하기 위한 객체
def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'-g',
'--goal_total_sum',
type=int,
default=50,
help='Target goal value of total sum')
parser.add_argument(
'argv', nargs=argparse.REMAINDER,
help='Pass arbitrary arguments to the executable')
args = parser.parse_args()
argparse가 --goal_total_sum 이라는 인자를 찾아 int 타입으로 변환default=50 : 인자가 제공되지 않을 경우 사용할 기본값MultiThreadedExecutor 는 ROS2에서 여러 개의 콜백을 동시에 실행하여 다른 콜백이 지연되지 않고 즉시 실행될 수 있도록 한다. 액션 서버가 긴 시간 동안 목표를 처리하는 동안 서비스 요청, 토픽 메시지 처리 등의 다른 콜백이 동시에 처리된다.
MutuallyExclusiveCallbackGroup
self.callback_group = MutuallyExclusiveCallbackGroup()
동일한 그룹에 속한 콜백들이 서로 동시에 실행되지 않도록 한다. 순차적으로 실행이 필요한 환경에서 사용한다.
ReentrantCallbackGroup
self.callback_group = ReentrantCallbackGroup()
동일한 콜백 그룹 내에서 여러 콜백이 동시에 실행되도록 한다.
calculator 에서는 토픽 구독, 서비스 요청, 액션 서버 3개의 콜백이 발생한다. 각 콜백은 해당 이벤트가 발생될 떄 호출되어 동시에 처리된다.
✔ 콜백 그룹 할당
calculator 노드에서 토픽 구독, 서비스 요청, 액선 서버의 3가지 동작이 요구되므로 콜백 그룹을 할당한다.
class Calculator(Node):
def __init__(self):
super().__init__('calculator')
self.argument_a = 0.0
self.argument_b = 0.0
self.argument_operator = 0
self.argument_result = 0.0
self.argument_formula = ''
self.operator = ['+', '-', '*', '/']
self.callback_group = ReentrantCallbackGroup()
self.callback_group = ReentrantCallbackGroup() : 동일한 콜백 그룹 내에서 여러 콜백이 동시에 실행될 수 있도록 허용✔ 토픽 구독 콜백
self.arithmetic_argument_subscriber = self.create_subscription(
ArithmeticArgument,
'arithmetic_argument',
self.get_arithmetic_argument,
QOS_RKL10V,
callback_group=self.callback_group)
✔ 서비스 서버 콜백
self.arithmetic_service_server = self.create_service(
ArithmeticOperator,
'arithmetic_operator',
self.get_arithmetic_operator,
callback_group=self.callback_group)
✔ 액션 서버 콜백
self.arithmetic_action_server = ActionServer(
self,
ArithmeticChecker,
'arithmetic_checker',
self.execute_checker,
callback_group=self.callback_group)
import rclpy
from rclpy.executors import MultiThreadedExecutor
from ex_calculator.calculator.calculator import Calculator
def main(args=None):
rclpy.init(args=args)
try:
calculator = Calculator()
executor = MultiThreadedExecutor(num_threads=4)
executor.add_node(calculator)
try:
executor.spin()
except KeyboardInterrupt:
calculator.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
executor.shutdown()
calculator.arithmetic_action_server.destroy()
calculator.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()
MultiThreadedExecutor(num_threads=4) : 현재는 3개의 콜백이 있지만, 추가 콜백이 생겼을 때 즉시 처리하기 위해 4개를 사용한다.💡 threads 수를 지정하지 않으면?
cpu 코어 개수만큼 스레드가 생성된다. 노드에서 필요한 작업 이상의 스레드가 생성되면 남는 스레드는 대기 상태로 메모리를 소비하기 때문에 자원 낭비가 발생할 수 있다.

Action goal이 수락 되고 연산 과정이 Action feedback 으로 나타낸다. 누적 한계치를 넘자 Action result 를 나타내고 동작이 종료된다.
import time
from ros_study_msgs.action import ArithmeticChecker
from ros_study_msgs.msg import ArithmeticArgument
from ros_study_msgs.srv import ArithmeticOperator
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
class Calculator(Node):
def __init__(self):
super().__init__('calculator')
self.argument_a = 0.0
self.argument_b = 0.0
self.argument_operator = 0
self.argument_result = 0.0
self.argument_formula = ''
self.operator = ['+', '-', '*', '/']
self.callback_group = ReentrantCallbackGroup()
self.declare_parameter('qos_depth', 10)
qos_depth = self.get_parameter('qos_depth').value
QOS_RKL10V = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
history=QoSHistoryPolicy.KEEP_LAST,
depth=qos_depth,
durability=QoSDurabilityPolicy.VOLATILE)
self.arithmetic_argument_subscriber = self.create_subscription(
ArithmeticArgument,
'arithmetic_argument',
self.get_arithmetic_argument,
QOS_RKL10V,
callback_group=self.callback_group)
self.arithmetic_service_server = self.create_service(
ArithmeticOperator,
'arithmetic_operator',
self.get_arithmetic_operator,
callback_group=self.callback_group)
self.arithmetic_action_server = ActionServer(
self,
ArithmeticChecker,
'arithmetic_checker',
self.execute_checker,
callback_group=self.callback_group)
def get_arithmetic_argument(self, msg):
self.argument_a = msg.argument_a
self.argument_b = msg.argument_b
self.get_logger().info('Timestamp of the message: {0}'.format(msg.stamp))
self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))
def get_arithmetic_operator(self, request, response):
self.argument_operator = request.arithmetic_operator
self.argument_result = self.calculate_given_formula(
self.argument_a,
self.argument_b,
self.argument_operator)
response.arithmetic_result = self.argument_result
self.argument_formula = '{0} {1} {2} = {3}'.format(
self.argument_a,
self.operator[self.argument_operator-1],
self.argument_b,
self.argument_result)
self.get_logger().info(self.argument_formula)
return response
def calculate_given_formula(self, a, b, operator):
if operator == ArithmeticOperator.Request.PLUS:
self.argument_result = a + b
elif operator == ArithmeticOperator.Request.MINUS:
self.argument_result = a - b
elif operator == ArithmeticOperator.Request.MULTIPLY:
self.argument_result = a * b
elif operator == ArithmeticOperator.Request.DIVISION:
try:
self.argument_result = a / b
except ZeroDivisionError:
self.get_logger().error('ZeroDivisionError!')
self.argument_result = 0.0
return self.argument_result
else:
self.get_logger().error(
'Please make sure arithmetic operator(plus, minus, multiply, division).')
self.argument_result = 0.0
return self.argument_result
def execute_checker(self, goal_handle):
self.get_logger().info('Execute arithmetic_checker action!')
feedback_msg = ArithmeticChecker.Feedback()
feedback_msg.formula = []
total_sum = 0.0
goal_sum = goal_handle.request.goal_sum
while total_sum < goal_sum:
total_sum += self.argument_result
feedback_msg.formula.append(self.argument_formula)
self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula))
goal_handle.publish_feedback(feedback_msg)
time.sleep(1)
goal_handle.succeed()
result = ArithmeticChecker.Result()
result.all_formula = feedback_msg.formula
result.total_sum = total_sum
return result
import argparse
import sys
import rclpy
from ex_calculator.checker.checker import Checker
def main(argv=sys.argv[1:]):
parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
parser.add_argument(
'-g',
'--goal_total_sum',
type=int,
default=50,
help='Target goal value of total sum')
parser.add_argument(
'argv', nargs=argparse.REMAINDER,
help='Pass arbitrary arguments to the executable')
args = parser.parse_args()
rclpy.init(args=args.argv)
try:
checker = Checker()
checker.send_goal_total_sum(args.goal_total_sum)
try:
rclpy.spin(checker)
except KeyboardInterrupt:
checker.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
checker.arithmetic_action_client.destroy()
checker.destroy_node()
finally:
rclpy.shutdown()
if __name__ == '__main__':
main()