액션 프로그래밍

Hyuna·2024년 10월 22일
0

ROS2

목록 보기
8/15
post-thumbnail

📕 참고: ROS2로 시작하는 로봇 프로그래밍, topic_service_action_rclpy_example




액션 목표를 지정하는 액션 클라이언트와 목표를 받아 동작을 수행하면서 중간 결과값에 해당하는 액션 피드백, 최종결과에 해당하는 액션 결과를 전송하는 액션 서버에 대해 살펴보고자 한다.

calculator - 액션 서버

  • 연산결과를 arithmetic_checker 이름으로 액션 피드백을 checker로 전송

checker - 액션 클라이언트

  • 연산값의 합이 액션 목표를 넘기면 최종 연산 결과를 터미널에 표시
  • 누적 한계치는 50으로 설정


checker/ 액션 클라이언트 코드


액션 클라이언트 생성

def __init__(self):
    super().__init__('checker')
    self.arithmetic_action_client = ActionClient(
      self,
      ArithmeticChecker,
      'arithmetic_checker')
  • ActionClient를 생성하여 ArithmeticChecker 액션 타입을 arithmetic_chcker라는 이름의 액션 클라이언트로 생성
  • ArithmeticChecker는 인터페이스 패키지에 action으로 정의되어 있음

목표 전송

✔ 액션 서버 준비

def send_goal_total_sum(self, goal_sum):
    wait_count = 1
    while not self.arithmetic_action_client.wait_for_server(timeout_sec=0.1):
        if wait_count > 3:
            self.get_logger().warning('Arithmetic action server is not available.')
            return False
        wait_count += 1
    
  • goal_sum : 목표로 설정할 값, 서버에서 계산할 총합
  • self.arithmetic_action_client.wait_for_server(timeout_sec=0.1) : 액션서버가 준비될 때까지 0.1초 기다렸다가 다시 서버 동작 여부를 파악 -> 3번 이상 확인했는데도 준비되지 않은 경우 액션서버가 사용 불가능하다고 파악

✔ 목표 메시지 생성

    goal_msg = ArithmeticChecker.Goal()
    goal_msg.goal_sum = float(goal_sum)
    
  • goal_msg = ArithmeticChecker.Goal() : 액션의 목표 메시지 객체 생성
  • goal_sumgola_msgfloat형태로 저장

✔ 비동기적으로 목표 전송

    self.send_goal_future = self.arithmetic_action_client.send_goal_async(
        goal_msg,
        feedback_callback=self.get_arithmetic_action_feedback)
    self.send_goal_future.add_done_callback(self.get_arithmetic_action_goal)
    return True
 
  • send_goal_async() : goal_msg를 비동기방식으로 전송
  • add_done_callback() : 목표 전송이 완료되고 서버가 수락하거나 거부하였는지 결과 처리
  • return True : 목표가 정상적으로 전송되었음을 의미

목표 수락 여부 확인

def get_arithmetic_action_goal(self, future):
    goal_handle = future.result()
    if not goal_handle.accepted:
        self.get_logger().warning('Action goal rejected.')
        return
    self.get_logger().info('Action goal accepted.')
    self.action_result_future = goal_handle.get_result_async()
    self.action_result_future.add_done_callback(self.get_arithmetic_action_result
  • goal_handle : 목표가 서버에 수락되었는지 거부되었는지 정보
  • self.action_result_future.add_done_callback(self.get_arithmetic_action_result : 액션 목표가 전달되었다면 액션 결과를 콜백 함수로 선언

피드백 처리

def get_arithmetic_action_feedback(self, feedback_msg):
    action_feedback = feedback_msg.feedback.formula
    self.get_logger().info('Action feedback: {0}'.format(action_feedback))
  • formula : 서버가 작업 수행 도중 생성되는 연산의 결과를 담은 리스트

결과 처리

def get_arithmetic_action_result(self, future):
    action_status = future.result().status
    action_result = future.result().result
    if action_status == GoalStatus.STATUS_SUCCEEDED:
        self.get_logger().info('Action succeeded!')
        self.get_logger().info(
            'Action result(all formula): {0}'.format(action_result.all_formula))
        self.get_logger().info(
            'Action result(total sum): {0}'.format(action_result.total_sum))
    else:
        self.get_logger().warning(
            'Action failed with status: {0}'.format(action_status))
  • 목표가 성공하며 결과를 로그로 출력

전체 코드


from action_msgs.msg import GoalStatus
from ros_study_msgs.action import ArithmeticChecker
from rclpy.action import ActionClient
from rclpy.node import Node


class Checker(Node):

    def __init__(self):
        super().__init__('checker')
        self.arithmetic_action_client = ActionClient(
          self,
          ArithmeticChecker,
          'arithmetic_checker')

    def send_goal_total_sum(self, goal_sum):
        wait_count = 1
        while not self.arithmetic_action_client.wait_for_server(timeout_sec=0.1):
            if wait_count > 3:
                self.get_logger().warning('Arithmetic action server is not available.')
                return False
            wait_count += 1
        goal_msg = ArithmeticChecker.Goal()
        goal_msg.goal_sum = (float)(goal_sum)
        self.send_goal_future = self.arithmetic_action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.get_arithmetic_action_feedback)
        self.send_goal_future.add_done_callback(self.get_arithmetic_action_goal)
        return True

    def get_arithmetic_action_goal(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warning('Action goal rejected.')
            return
        self.get_logger().info('Action goal accepted.')
        self.action_result_future = goal_handle.get_result_async()
        self.action_result_future.add_done_callback(self.get_arithmetic_action_result)

    def get_arithmetic_action_feedback(self, feedback_msg):
        action_feedback = feedback_msg.feedback.formula
        self.get_logger().info('Action feedback: {0}'.format(action_feedback))

    def get_arithmetic_action_result(self, future):
        action_status = future.result().status
        action_result = future.result().result
        if action_status == GoalStatus.STATUS_SUCCEEDED:
            self.get_logger().info('Action succeeded!')
            self.get_logger().info(
                'Action result(all formula): {0}'.format(action_result.all_formula))
            self.get_logger().info(
                'Action result(total sum): {0}'.format(action_result.total_sum))
        else:
            self.get_logger().warning(
                'Action failed with status: {0}'.format(action_status))


calculator/액션 서버 코드


액션 서버 생성

self.arithmetic_action_server = ActionServer(
    self,
    ArithmeticChecker,
    'arithmetic_checker',
    self.execute_checker,
    callback_group=self.callback_group)

  • ArithmeticChecker 액션 타입을 arithmetic_checker 라는 이름으로 액션 서버 생성

목표 처리 함수

실제 액션 목표를 받은 후 실행되기 때문에 액션 서버에서 중요한 부분이다.

def execute_checker():
    self.get_logger().info('Execute arithmetic_ckecker action!')
    feedback_msg = ArithmeticChecker.Feedback()
    feedback_msg.formula = [] #연산 과정이 담긴 리스트 초기화
    total_sum = 0.0 # 누적 합계 초기화
    goal_sum = goal_handle.request.goal_sum
  
  • feedback_msg = ArithmeticChecker.Feedback() : 피드백 메시지 객체를 성생하여 클라이언트에게 피드백 전송
  • goal_handle.requsest.goal_sum : 클라이언트로부터 목표 값goal_sum을 가져와 총합이 goal_sum에 도달할때까지 연산 수행


    ✔ 목표 처리 및 피드백 전송
while total sum > goal_sum:
    total_sum += self.argument_result
    feedback_msg.formula.append(self.argument_formula)
    self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula)))
    goal_handle.publish_feedback(feedback_msg)
    time.sleep(1)
  • total_sumgoal_sum 에 도달할때까지 연산 반복
  • goal_handle.publish_feedback(feedback_msg) : 피드백 메시지를 클라이언트로 전송하여 실시간으로 진행상황 확인
  • time.sleep(1) : 1초 동안 대기 후 피드백 전송

✔ 목표 완료 및 결과 반환

goal_handle.succeed()
result = ArithmeticChecker.Result()
result.all_formula = feedback_msg.formula
result.total_sum = total_sum # 최종 누적 합계
return result
  • result = ArithmeticChecker.Result() : 최정 결과 메시지 객체 생성
  • return result : 최종 결과를 반환하여 서비스 클라이언트에게 전달


ckecker/main


parser : 명령줄에 전달되는 인자를 정의하고 파싱(데이터를 유의미한 데이터 구조로 변환)하기 위한 객체


def main(argv=sys.argv[1:]):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '-g',
        '--goal_total_sum',
        type=int,
        default=50,
        help='Target goal value of total sum')
    parser.add_argument(
        'argv', nargs=argparse.REMAINDER,
        help='Pass arbitrary arguments to the executable')
    args = parser.parse_args()

 
  • argparse--goal_total_sum 이라는 인자를 찾아 int 타입으로 변환
  • default=50 : 인자가 제공되지 않을 경우 사용할 기본값


MultiThreadedExecutor


MultiThreadedExecutor 는 ROS2에서 여러 개의 콜백을 동시에 실행하여 다른 콜백이 지연되지 않고 즉시 실행될 수 있도록 한다. 액션 서버가 긴 시간 동안 목표를 처리하는 동안 서비스 요청, 토픽 메시지 처리 등의 다른 콜백이 동시에 처리된다.

  • MutuallyExclusiveCallbackGroup
    self.callback_group = MutuallyExclusiveCallbackGroup()
    동일한 그룹에 속한 콜백들이 서로 동시에 실행되지 않도록 한다. 순차적으로 실행이 필요한 환경에서 사용한다.

  • ReentrantCallbackGroup
    self.callback_group = ReentrantCallbackGroup()
    동일한 콜백 그룹 내에서 여러 콜백이 동시에 실행되도록 한다.


calculator

calculator 에서는 토픽 구독, 서비스 요청, 액션 서버 3개의 콜백이 발생한다. 각 콜백은 해당 이벤트가 발생될 떄 호출되어 동시에 처리된다.

✔ 콜백 그룹 할당
calculator 노드에서 토픽 구독, 서비스 요청, 액선 서버의 3가지 동작이 요구되므로 콜백 그룹을 할당한다.

class Calculator(Node):

    def __init__(self):
        super().__init__('calculator')
        self.argument_a = 0.0
        self.argument_b = 0.0
        self.argument_operator = 0
        self.argument_result = 0.0
        self.argument_formula = ''
        self.operator = ['+', '-', '*', '/']
        self.callback_group = ReentrantCallbackGroup()
  • self.callback_group = ReentrantCallbackGroup() : 동일한 콜백 그룹 내에서 여러 콜백이 동시에 실행될 수 있도록 허용

✔ 토픽 구독 콜백

self.arithmetic_argument_subscriber = self.create_subscription(
    ArithmeticArgument,
    'arithmetic_argument',
    self.get_arithmetic_argument,
    QOS_RKL10V,
    callback_group=self.callback_group)

✔ 서비스 서버 콜백

self.arithmetic_service_server = self.create_service(
    ArithmeticOperator,
    'arithmetic_operator',
    self.get_arithmetic_operator,
    callback_group=self.callback_group)

✔ 액션 서버 콜백

self.arithmetic_action_server = ActionServer(
    self,
    ArithmeticChecker,
    'arithmetic_checker',
    self.execute_checker,
    callback_group=self.callback_group)

calculator/main

import rclpy
from rclpy.executors import MultiThreadedExecutor

from ex_calculator.calculator.calculator import Calculator


def main(args=None):
    rclpy.init(args=args)
    try:
        calculator = Calculator()
        executor = MultiThreadedExecutor(num_threads=4)
        executor.add_node(calculator)
        try:
            executor.spin()
        except KeyboardInterrupt:
            calculator.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            executor.shutdown()
            calculator.arithmetic_action_server.destroy()
            calculator.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()
  • MultiThreadedExecutor(num_threads=4) : 현재는 3개의 콜백이 있지만, 추가 콜백이 생겼을 때 즉시 처리하기 위해 4개를 사용한다.

💡 threads 수를 지정하지 않으면?

cpu 코어 개수만큼 스레드가 생성된다. 노드에서 필요한 작업 이상의 스레드가 생성되면 남는 스레드는 대기 상태로 메모리를 소비하기 때문에 자원 낭비가 발생할 수 있다.


실행


Action goal이 수락 되고 연산 과정이 Action feedback 으로 나타낸다. 누적 한계치를 넘자 Action result 를 나타내고 동작이 종료된다.



전체 코드


calculator

import time

from ros_study_msgs.action import ArithmeticChecker
from ros_study_msgs.msg import ArithmeticArgument
from ros_study_msgs.srv import ArithmeticOperator
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy


class Calculator(Node):

    def __init__(self):
        super().__init__('calculator')
        self.argument_a = 0.0
        self.argument_b = 0.0
        self.argument_operator = 0
        self.argument_result = 0.0
        self.argument_formula = ''
        self.operator = ['+', '-', '*', '/']
        self.callback_group = ReentrantCallbackGroup()

        self.declare_parameter('qos_depth', 10)
        qos_depth = self.get_parameter('qos_depth').value

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

        self.arithmetic_argument_subscriber = self.create_subscription(
            ArithmeticArgument,
            'arithmetic_argument',
            self.get_arithmetic_argument,
            QOS_RKL10V,
            callback_group=self.callback_group)

        self.arithmetic_service_server = self.create_service(
            ArithmeticOperator,
            'arithmetic_operator',
            self.get_arithmetic_operator,
            callback_group=self.callback_group)

        self.arithmetic_action_server = ActionServer(
            self,
            ArithmeticChecker,
            'arithmetic_checker',
            self.execute_checker,
            callback_group=self.callback_group)

    def get_arithmetic_argument(self, msg):
        self.argument_a = msg.argument_a
        self.argument_b = msg.argument_b
        self.get_logger().info('Timestamp of the message: {0}'.format(msg.stamp))
        self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
        self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))

    def get_arithmetic_operator(self, request, response):
        self.argument_operator = request.arithmetic_operator
        self.argument_result = self.calculate_given_formula(
            self.argument_a,
            self.argument_b,
            self.argument_operator)
        response.arithmetic_result = self.argument_result
        self.argument_formula = '{0} {1} {2} = {3}'.format(
                self.argument_a,
                self.operator[self.argument_operator-1],
                self.argument_b,
                self.argument_result)
        self.get_logger().info(self.argument_formula)
        return response

    def calculate_given_formula(self, a, b, operator):
        if operator == ArithmeticOperator.Request.PLUS:
            self.argument_result = a + b
        elif operator == ArithmeticOperator.Request.MINUS:
            self.argument_result = a - b
        elif operator == ArithmeticOperator.Request.MULTIPLY:
            self.argument_result = a * b
        elif operator == ArithmeticOperator.Request.DIVISION:
            try:
                self.argument_result = a / b
            except ZeroDivisionError:
                self.get_logger().error('ZeroDivisionError!')
                self.argument_result = 0.0
                return self.argument_result
        else:
            self.get_logger().error(
                'Please make sure arithmetic operator(plus, minus, multiply, division).')
            self.argument_result = 0.0
        return self.argument_result

    def execute_checker(self, goal_handle):
        self.get_logger().info('Execute arithmetic_checker action!')
        feedback_msg = ArithmeticChecker.Feedback()
        feedback_msg.formula = []
        total_sum = 0.0
        goal_sum = goal_handle.request.goal_sum
        while total_sum < goal_sum:
            total_sum += self.argument_result
            feedback_msg.formula.append(self.argument_formula)
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)
        goal_handle.succeed()
        result = ArithmeticChecker.Result()
        result.all_formula = feedback_msg.formula
        result.total_sum = total_sum
        return result

checker/main

import argparse
import sys

import rclpy

from ex_calculator.checker.checker import Checker


def main(argv=sys.argv[1:]):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '-g',
        '--goal_total_sum',
        type=int,
        default=50,
        help='Target goal value of total sum')
    parser.add_argument(
        'argv', nargs=argparse.REMAINDER,
        help='Pass arbitrary arguments to the executable')
    args = parser.parse_args()

    rclpy.init(args=args.argv)
    try:
        checker = Checker()
        checker.send_goal_total_sum(args.goal_total_sum)
        try:
            rclpy.spin(checker)
        except KeyboardInterrupt:
            checker.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            checker.arithmetic_action_client.destroy()
            checker.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()

0개의 댓글