ROS2 package - checker(5)

이준혁·2024년 12월 24일

ROS2

목록 보기
9/14

제가 작성하는 이 글은 제가 공부하기 위해 작성하고 제 학교 후배들에게 조금이라도 ros에 대해서 접하게 하고자 작성하게 되었습니다. 그러니 만약 틀린내용이나 그러니 제가 부족한 부분이 있을수 있으니 주의해주면서 봐주시면 감사하겠습니다 -Happy Lee-


다음 action 부분에 Checker 입니다.

from action_msgs.msg import GoalStatus
from study_msg.action import ArithmeticChecker
from rclpy.action import ActionClient
from rclpy.node import Node


class Checker(Node):

    def __init__(self):
        super().__init__('checker')
        self.arithmetic_action_client = ActionClient(
          self,
          ArithmeticChecker,
          'arithmetic_checker')

    def send_goal_total_sum(self, goal_sum):
        wait_count = 1
        while not self.arithmetic_action_client.wait_for_server(timeout_sec=0.1):
            if wait_count > 3:
                self.get_logger().warning('Arithmetic action server is not available.')
                return False
            wait_count += 1
        goal_msg = ArithmeticChecker.Goal()
        goal_msg.goal_sum = (float)(goal_sum)
        self.send_goal_future = self.arithmetic_action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.get_arithmetic_action_feedback)
        self.send_goal_future.add_done_callback(self.get_arithmetic_action_goal)
        return True

    def get_arithmetic_action_goal(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warning('Action goal rejected.')
            return
        self.get_logger().info('Action goal accepted.')
        self.action_result_future = goal_handle.get_result_async()
        self.action_result_future.add_done_callback(self.get_arithmetic_action_result)

    def get_arithmetic_action_feedback(self, feedback_msg):
        action_feedback = feedback_msg.feedback.formula
        self.get_logger().info('Action feedback: {0}'.format(action_feedback))

    def get_arithmetic_action_result(self, future):
        action_status = future.result().status
        action_result = future.result().result
        if action_status == GoalStatus.STATUS_SUCCEEDED:
            self.get_logger().info('Action succeeded!')
            self.get_logger().info(
                'Action result(all formula): {0}'.format(action_result.all_formula))
            self.get_logger().info(
                'Action result(total sum): {0}'.format(action_result.total_sum))
        else:
            self.get_logger().warning(
                'Action failed with status: {0}'.format(action_status))

라이브러리

from action_msgs.msg import GoalStatus : ROS 2의 action_msgs 패키지에서 제공하는 메시지 타입인 GoalStatus를 가져오는 부분

첫번째 함수

    def send_goal_total_sum(self, goal_sum):
        wait_count = 1
        while not self.arithmetic_action_client.wait_for_server(timeout_sec=0.1):
            if wait_count > 3:
                self.get_logger().warning('Arithmetic action server is not available.')
                return False
            wait_count += 1
        goal_msg = ArithmeticChecker.Goal()
        goal_msg.goal_sum = (float)(goal_sum)
        self.send_goal_future = self.arithmetic_action_client.send_goal_async(
            goal_msg,
            feedback_callback=self.get_arithmetic_action_feedback)
        self.send_goal_future.add_done_callback(self.get_arithmetic_action_goal)
        return True

처음에는 그냥 wait_count값으로 액션 클라이언트가 액션 서버러 통신준비를 기다리고 있는 것이다.

목표메시지 생성 / goal_msg = ArithmeticChecker.Goal()
ArithmeticChecker값을 목표 메시지 타입으로 만들고 사용자가 입력받은 goal_sum을 float로 변환한후 메시지의 goal_sum으로 설정한다.

목표 전송 / .send_goal_async
self.arithmetic_action_client.send_goal_async : 여기서 비동기적으로 전송하는 것으로 goal_msg값을 넣고 피드백 callback을 get_arithmetic_action_feedback을 사용합니다.
여기서 피드백이 필요한것은 중간마다 진행상태를 알리기 위해 사용되는것을 말한다.

목표 전송완료 등록 / .add_done_callback
목표 전송 작업이 완료된 후 호출될 콜백 메서드를 등록합니다.

결과반환
return True를 하여 결과를 반환한다.

두번째 함수

    def get_arithmetic_action_goal(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().warning('Action goal rejected.')
            return
        self.get_logger().info('Action goal accepted.')
        self.action_result_future = goal_handle.get_result_async()
        self.action_result_future.add_done_callback(self.get_arithmetic_action_result)

ROS 2 액션 클라이언트에서 비동기적으로 목표(goal)를 전송한 후, 목표가 서버에 의해 수락되었는지 확인하고, 결과(result)를 처리하는 역할을 한다.

목표 핸들 가져오기
future.result()를 호출하면 GoalHandle 객체가 반환됩니다

그이후 수락 된지 확인하는 절차이고 호출될 콜백 메서드를 등록합니다.

세번째, 네번째 함수

  def get_arithmetic_action_feedback(self, feedback_msg):
        action_feedback = feedback_msg.feedback.formula
        self.get_logger().info('Action feedback: {0}'.format(action_feedback))

    def get_arithmetic_action_result(self, future):
        action_status = future.result().status
        action_result = future.result().result
        if action_status == GoalStatus.STATUS_SUCCEEDED:
            self.get_logger().info('Action succeeded!')
            self.get_logger().info(
                'Action result(all formula): {0}'.format(action_result.all_formula))
            self.get_logger().info(
                'Action result(total sum): {0}'.format(action_result.total_sum))
        else:
            self.get_logger().warning(
                'Action failed with status: {0}'.format(action_status))

세번째 함수
feedback_msg는 액션 서버에서 전송한 피드백 메시지입니다. 그것을 지금 따로 만든것입니다.

네번째 함수
이것은 결과를 처리하는것으로 서버에서 반환한 목표의 상태(status)와 결과(result)를 포함합니다.
그리고 그것에 대한 결과를 도출하고 만약 안되는 경우 else로 반환하게 됩니다.

main 함수

import argparse
import sys

import rclpy

from py_test.checker.checker import Checker


def main(argv=sys.argv[1:]):
    parser = argparse.ArgumentParser(formatter_class=argparse.ArgumentDefaultsHelpFormatter)
    parser.add_argument(
        '-g',
        '--goal_total_sum',
        type=int,
        default=50,
        help='Target goal value of total sum')
    parser.add_argument(
        'argv', nargs=argparse.REMAINDER,
        help='Pass arbitrary arguments to the executable')
    args = parser.parse_args()

    rclpy.init(args=args.argv)
    try:
        checker = Checker()
        checker.send_goal_total_sum(args.goal_total_sum)
        try:
            rclpy.spin(checker)
        except KeyboardInterrupt:
            checker.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            checker.arithmetic_action_client.destroy()
            checker.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()

라이브러리
import argparse : argparse는 Python에서 명령줄 인자를 쉽게 처리할 수 있도록 돕는 표준 라이브러리입니다. 이것은 우리가 - help로 그런것으로 쉽게 판단할수 있도록 합니다.

import sys : 인터프리터를 제어하는 함수

main 함수
def main(argv=sys.argv[1:]) : 이거는 지금 명령줄로
python3 script.py -g 100 이거를 했을때 0이면 script.py가 들어오고 1: 이렇게 한 이유는 ["-g", "100"] 이게 들어오게 된다.

parser 부분 설정
ArgumentParser : 명령줄을 활용하는 개체를 생성하여
formatter_class=argparse.ArgumentDefaultsHelpFormatter 이걸로 --help의 기본값을 자동으로 추가합니다.

parser.add_argument( : 이부분에서 나오는 거는 지금 선택적인 명령줄 플래그로

python3 script.py -g 100
python3 script.py --goal_total_sum 200
python3 script.py  # (기본값 50 사용)

다음 parser.add_argument(도 유사하게 argv에 나머지 인자들은 저장하는 역할을 한다.

args = parser.parse_args() : 이거는 총 명령줄 인자들 읽고 각 정의된 인자에 맞게 파싱합니다.(그 위치 관계에 맞게 맞춘다고 생각)

python3 script.py -g 100 arg1 arg2 arg3

-> args.goal_total_sum == 100
-> args.argv == ['arg1', 'arg2', 'arg3']

이런식으로 사용 가능하다.

rclpy 부분 설정
rclpy.init(args=args.argv) : ros2의 시스템을 초기화 합니다.

checker = Checker()
checker.send_goal_total_sum(args.goal_total_sum)

checker 값을 정의하고 노드를 생성합니다.

메서드를 호출하고 액션 서버에 목표값을 설정하여 진행합니다.

그 이후 계속 진행되는 것입니다. 그 이후 spin을 돌고 끝내게 되면 노드를 제거합니다.


인터프리터 : 프로그래밍 언어의 소스 코드를 바로 실행하는 컴퓨터 프로그램 또는 환경

profile
#자기공부 #틀린것도많음 #자기개발 여러분 인생이 힘들다 하더라도 그것을 깨는 순간 큰 희열감으로 옵니다~

0개의 댓글