PinkLAB - 민형기 ROS2 응용 실습 9일차 - Action Server & Client

안상훈·2024년 8월 30일
0

ROS2 실습

목록 보기
13/13
post-thumbnail

본 블로그 포스팅은 https://www.youtube.com/@pinklab_studio/playlists
에서 현재 강의 중에 있는

ROS2 무작정 따라하기강의의 내용을 필자가 다시 복기하여 기록하는 내용에 관한 것이다.


1. 작업개요

본 포스팅에서 다룰 내용은 ROS2 무작정 따라하기 강좌의 3-(8~10) 일정한 거리를 이동하는 액션 서버구현 이나, 그간 작성해온 응용 포스트를 살펴본다면

민형기 교수님이 강의하시는 강의내용을 응용 파트에 맞춰 필자가 좀 더 응용된 실습을 진행하고 있다.

따라서 이번 포스트도 실습 내용의 난이도를 좀 더 높였으며, 그 구성은 아래와 같다.

노드는 MultiThreadedExecutor 모드로

action_cli_node : 액션 클라이언트 노드로, act_dist_turtle명으로 구동중인 액션 서버에 접속하여 액션 목표(Goal)을 전달해 작업 목표를 설정하고, 작업 목표가 완료됫을 시 액션 결과(Resutl) 항목 중 result_dist 항목을 추출해 결과 확인

action_srv_node : act_dist_turtle명으로 액션 서버를 구동해 액션 목표(Goal)가 설정되면 설정된 액션 목표를 도달할 수 있도록 pub_and_sub_node에 터틀봇 제어 명령을 발행하라 지시

\rightarrow 발행 지시방법은 IPC통신 : ipc_queue.state항목을 True로 전환

액션 목표를 달성하기 위해 꾸준하게 pub_and_sub_node노드가 수집하는 터틀봇 위치/자세 정보를 기반으로 터틀봇의 이동거리 계산

\rightarrow 위치/자세정보 수집방법은 : IPC통신 : ipc_queue.(pos_x, pos_y)에 기입된 정보를 수집하여 이동거리 계산

액션 목표를 달성하면 ipc_queue.stateFalse로 전환하여 터틀봇 제어 명령 발행을 중단, 목표를 달성한 순간의 터틀봇 위치/자세 정보 출력

\rightarrow 액션 결과(Resutl) 항목 중 pos_x, pos_y, pos_theta 항목 출력

pub_and_sub_node : 터틀봇 제어 및 위치/자세정보 수집을 위한 노드로
퍼블리셔 /turtle1/cmd_vel를 통해 터틀봇 제어
섭스크라이버 /turtle1/pose를 통해 터틀봇 위치/자세 정보 확인 \rightarrow ipc_queue.(pos_x, pos_y)로 위치 정보 전달

노드는 3개, 메세지는 액션(사실상 3종), 토픽(2종)을 다루기에 하나의 패키지 - 노드파이썬파일로 만드는 것이 더 합리적이긴 하나,

MultiThreadedExecutor를 극한까지 사용하기 위해 프로세스간 통신 방법인IPC(Inter Process Communication)까지 사용해보자



2. 코드리뷰

0) 전역 설정 부

import rclpy as rp

# 멀티스레딩 기능을 활성화 하기 위한 MultiThreadedExecutor
from rclpy.executors import MultiThreadedExecutor

from rclpy.node import Node

# Action기능을 쓰려면 별도의 ActionServer, ActionClient
# 라이브러리를 import 해야함
from rclpy.action import ActionServer
from rclpy.action import ActionClient

# 터틀봇의 위치/자세정보 취득을 위한 메세지
from turtlesim.msg import Pose
# 터틀봇의 제어명령 수행을 위한 메세지
from geometry_msgs.msg import Twist
# 커스텀 액션 메세지
from my_trd_pkg_msgs.action import DistTurtle

# 멀티 프로세서간 통신을 위한 Queue 라이브러리
from multiprocessing import Queue
import random, time, math


# Queue를 전역 변수로 선언하여 노드 간 공유
ipc_queue = Queue()
ipc_queue.state = False
ipc_queue.pos_x = 0.0
ipc_queue.pos_y = 0.0
ipc_queue.pos_theta = 0.0

앞서 IPC통신을 구현한다고 막 어렵게 썰을 풀었지만
사실상 구현하는 방식은 전역변수 형식으로 코드를 구현했다.

근데 그 전역변수 라는 것이 프로세스(노드)간 왓다갓다가 가능한 데이터 형식인

multiprocessing 라이브러리의 Queue클래스 자료형으로 전역변수를 선언한 것 뿐이다.

따라서 어렵게 생각할 필요성이 없다.


1) main 함수 부

def main(args=None):
    rp.init(args=args)

    #ROS2 스케줄링 관리 단위 : Executor을 멀티 프로세싱 모드로 객체화
    executor = MultiThreadedExecutor()

    action_srv_node = ActionSrvNode()
    action_cli_node = ActionCliNode()
    pub_and_sub_node= PubSubNode()

    # executor에 인스턴스화 한 노드항목 추가
    executor.add_node(action_srv_node)
    executor.add_node(action_cli_node)
    executor.add_node(pub_and_sub_node)

    try:
        # cmd창에서 goal에 대한 숫자를 입력하면 float로 환
        input_data = float(input("Enter goal distance: "))

        # 여기서 Feature는 `action_type.Result` 타입의 객체이나,
        # 따지고 보면 Service 클라이언트의 Req수행 후 반환되는 객체인
        # Future 객체와 동일한 구성의 객체를 반환한다.
        future = action_cli_node.send_goal(input_data)

        # 액션 클라이언트로 Goal 설정 정보를 전달해서 나중에 send_goal_async로
        # Goal 설정이 완료되면 goal_response_callback 함수를 호출 하게끔
        # 액션 클라이언트의 상태정보를 관리하는 future메서드 내
        # add_done_callback 메서드를 사용한다.
        future.add_done_callback(action_cli_node.goal_response_callback)

        executor.spin()

    finally:
        executor.shutdown()

        pub_and_sub_node.destroy_node()
        action_cli_node.destroy_node()
        action_srv_node.destroy_node()

        rp.shutdown()


if __name__ == '__main__()':
    main()

다른것은 크게 어려움이 없는데
future 부분이 나오면서 살짝 코드가 난해해진다.

이 부분은 Action client 예제코드인

https://docs.ros.org/en/eloquent/Tutorials/Actions/Writing-a-Py-Action-Server-Client.html

이 부분을 참조하면서 보는게 조금 더 편하다

위 Action Client 예제의 사례를 가져와서
클라이언트의 Goal이 설정되고, 또 설정된 Goal을 달성햇을 시 각각의 콜백함수를 호출하는 코드를 응용했다...

이렇게 보면 된다.


2) 액션 클라이언트 설계 부

class ActionCliNode(Node): #액션 클라이언트 기능 설계
    def __init__(self):
        super().__init__('DistTurtleActCliNode')
        self.action_client = ActionClient(
            node=self,
            action_type=DistTurtle,
            action_name='act_dist_turtle'
        )

    # 사용자가 커맨드창에서 입력한 숫자는 `cmd_dist`로 받는다.
    def send_goal(self, cmd_dist):
        goal_msg = DistTurtle.Goal()
        goal_msg.cmd_dist = cmd_dist

        #서버가 구동될때까지 기다리기
        self.action_client.wait_for_server() 

        # 액션 클라이언트가 설정된 goal_msg정보를 가져와서
        # goal을 설정하게끔 서버에 비동기로 요청하는 
        # send_goal_async메서드를 사용하고 이를 호출한다.
        # 이때 반환되는 정보는 'Future' 객체이다.
        future = self.action_client.send_goal_async(goal_msg)
        return future
    
    # 앞서 main함수 구문에서 동작중인 future 객체를 가져와서
    # Goal 설정이 잘 되엇는지? 
    # Goal 목표를 완수하면 그 결과를 가져오는 함수를 구현한다.
    def goal_response_callback(self, future):
        goal_handle = future.result()
        # 골 설정 실페에 따른 예외처리 구문
        if not goal_handle.accepted:
            self.get_logger().info('Goal 설정실패')
            return
        self.get_logger().info('Goal 설정')

        # 골 목표 완수에 따른 결과값을 호출하는 구문
        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info(
            f'act_cli-최종 이동거리: {result.result_dist:.3f}')

액션 클라이언트 설계 부분에서는

send_goal 함수에서 설정된 Goal값에 따라서 액션 서버가 목표를 설정하도록 send_goal_async

그 이후 목표를 달성했으면 달성 결과에 대한 Result를 요청하는 get_result_async 두개의 비동기 호출 메서드를 설계했다.


3) 액션 서버 설계 부

class ActionSrvNode(Node): #액션서버 기능 설계
    def __init__(self):
        super().__init__('DistTurtleActSrvNode')    # 노드명 기입

        self.action_srv = ActionServer(
            node=self,
            action_type=DistTurtle,
            action_name='act_dist_turtle',
            execute_callback = self.action_srv_callback
        )

        #좌표 저장은 편하게 Pose 메세지 타입 클래스를 상숙해서 만들자
        self.pre_pose = Pose()
        self.record_dist = 0 #이동 거리는 누적형태로 저장함

    def action_srv_callback(self, goal_handle):
        # 액션 서버 콜백 함수에서 다룰 액션 메세지 타입 객체화
        feedback_msg = DistTurtle.Feedback()
        result_msg = DistTurtle.Result()

        #설정된 골 정보를 가져옴
        goal = goal_handle.request.cmd_dist

        # cmd_turtle_msg의 발행주기를 관장하는 시간주기 변수 설정
        diff_time_sec = 1.0

        while goal > self.record_dist:
            # goal이 설정 -> 터틀봇 제어 Pub가 발행되게
            # ipc_queue.State를 False -> True로 변경
            ipc_queue.state = True

            #이동 거리 누적
            self.record_dist += self.clac_dist()
            #누적된 이동거리 값을 퍼를리시
            feedback_msg.record_dist = self.record_dist
            self.get_logger().info(
                f'act_srv-누적 이동거리: {feedback_msg.record_dist:.3f}')

            # 터틀봇이 현재까지 이동한 거리 정보 발행
            goal_handle.publish_feedback(feedback_msg)

            #제어메세지 발행 간격 정의
            time.sleep(diff_time_sec)

            # 터틀봇의 이동거리가 목표 이동거리에 근접하면 while문 종료
            if goal - feedback_msg.record_dist < 0.2 :
                break
        
        # 액션 서버의 상태를 SUCCEED로 변경
        goal_handle.succeed()
        # 액션 서버 상태가 SUCCEED이니 state를 제어명령 발행 중지로 변경
        ipc_queue.state = False

        # 액션 서버가 Succeed(완수)상태이니 result 메세지 출력
        result_msg.pos_x = ipc_queue.pos_x
        result_msg.pos_y = ipc_queue.pos_y
        result_msg.pos_theta = ipc_queue.pos_theta
        result_msg.result_dist = feedback_msg.record_dist

        self.get_logger().info(
            f'act_srv-좌표: [x: {result_msg.pos_x:.3f}, ' + 
            f'y: {result_msg.pos_y:.3f}, ' + 
            f'theta: {result_msg.pos_theta:.4f}]')

        return result_msg
    
    # 누적 이동거리 계산 함수
    def clac_dist(self):
        if ipc_queue.state == False: #목표 설정이 안된 경우
            self.pre_pose.x = ipc_queue.pos_x
            self.pre_pose.y = ipc_queue.pos_y
            return 0.0 #이동거리는 0으로 리턴함

        else : #목표 설정이 되서 터틀봇이 제어되고 있는 경우
            dist = math.sqrt((ipc_queue.pos_x - self.pre_pose.x)**2 + 
                             (ipc_queue.pos_y - self.pre_pose.y)**2)
            
            # 좌표 업데이트
            self.pre_pose.x = ipc_queue.pos_x
            self.pre_pose.y = ipc_queue.pos_y
            return dist

액션 서버는 Goal이 설정되면 그때부터 동작되는
execute_callback함수인 action_srv_callback에서

터틀봇 제어 퍼블리셔가 구동되게끔 : ipc_queue.state = True

후 목표를 달성하기 위한 매 누적이동거리 계산 : clac_dist()

목표달성 조건 설정 : goal - feedback_msg.record_dist < 0.2

달성 후 퍼블리싱 중단 : ipc_queue.state = False

목표 달성 후 결과 출력 등을 수행한다.


4) 퍼블리셔 + 섭스크라이버 노드 부

class PubSubNode(Node): #터틀봇의 제어, 위치/자세 정보 수집 노드설계
    def __init__(self):
        super().__init__('TurtleStateNode')
        
        # 퍼블리셔
        self.publisher = self.create_publisher(
            msg_type=Twist,
            topic='/turtle1/cmd_vel',
            qos_profile=10
        )
        # 섭스크라이버
        self.subscriber = self.create_subscription(
            msg_type=Pose,
            topic='/turtle1/pose',
            callback= self.callback_pose,
            qos_profile=10,
        )
        # 퍼블리셔를 일정 주기로 발행하기 위한 타이머 콜백함수
        self.timer_period = 1.0
        self.timer = self.create_timer(
            timer_period_sec=self.timer_period,
            callback=self.timer_callback
        )
    
    # 섭스크라이버의 콜백함수
    def callback_pose(self, msg):
        ipc_queue.pos_x = msg.x
        ipc_queue.pos_y = msg.y
        ipc_queue.pos_theta = msg.theta

    # 일정 주기로 퍼블리싱 하기 위한 타이머 콜백함
    def timer_callback(self):
        cmd_msg = Twist()
        if ipc_queue.state == True:
            #터틀봇 제어메세지(cmd_msg)에 데이터값 기입
            cmd_msg.linear.x = random.random() * 2
            cmd_msg.angular.z = random.random() * 2.5

            #터틀봇 제어 정보를 발행하여 터틀봇 제어
            self.publisher.publish(msg=cmd_msg)

이 부분은 토픽 퍼블리셔, 토픽 섭스크라이버를 함께 하나의 노드에 구현한 것이기에 그다지 어려울 것은 없고...

매 수신받은 터틀봇 위치/자세 정보를

전역 Queue에 넘겨주는 IPC통신이랑

ipc_queue.state 조건에 따라 퍼블리싱 구동/비구동 의 코드만 추가되었다.


5) setup.py에 신규노드(파이썬 파일) 등록

마지막으로는 setup.py \rightarrow entry_point'my_dist_turtle_node = my_trd_pkg.my_action_srv_cli:main'를 쉼표와 함께 등록해주자

그 이후 패키지 빌드 및 등록 수행하면 된다.

$ colcon build --packages-select my_trd_pkg
$ r2pkgsetup


3. 코드 구동

1번 bash : 작성한 노드 실행

$ ros2 run my_trd_pkg my_dist_turtle_node

2번 bash : 터틀심 노드 실행

$ humble
$ ros2 run turtlesim turtlesim_node

qrt_graph를 확인해보면 설계한 노드가 각각 잘 구현된 것을 확인할 수 있다.

따로 Action메세지를 echo해서 확인하지는 않았지만
각 데이터 정보를 get_logger메서드로 출력되게 만들었으니
동작은 다 잘 된다.. 라고 보면 된다.

혹시 모르니 전체 코드를 아래 첨부하겠다.

import rclpy as rp

# 멀티스레딩 기능을 활성화 하기 위한 MultiThreadedExecutor
from rclpy.executors import MultiThreadedExecutor

from rclpy.node import Node

# Action기능을 쓰려면 별도의 ActionServer, ActionClient
# 라이브러리를 import 해야함
from rclpy.action import ActionServer
from rclpy.action import ActionClient

# 터틀봇의 위치/자세정보 취득을 위한 메세지
from turtlesim.msg import Pose
# 터틀봇의 제어명령 수행을 위한 메세지
from geometry_msgs.msg import Twist
# 커스텀 액션 메세지
from my_trd_pkg_msgs.action import DistTurtle

# 멀티 프로세서간 통신을 위한 Queue 라이브러리
from multiprocessing import Queue
import random, time, math


# Queue를 전역 변수로 선언하여 노드 간 공유
ipc_queue = Queue()
ipc_queue.state = False
ipc_queue.pos_x = 0.0
ipc_queue.pos_y = 0.0
ipc_queue.pos_theta = 0.0

class ActionCliNode(Node): #액션 클라이언트 기능 설계
    def __init__(self):
        super().__init__('DistTurtleActCliNode')
        self.action_client = ActionClient(
            node=self,
            action_type=DistTurtle,
            action_name='act_dist_turtle'
        )

    # 사용자가 커맨드창에서 입력한 숫자는 `cmd_dist`로 받는다.
    def send_goal(self, cmd_dist):
        goal_msg = DistTurtle.Goal()
        goal_msg.cmd_dist = cmd_dist

        #서버가 구동될때까지 기다리기
        self.action_client.wait_for_server() 

        # 액션 클라이언트가 설정된 goal_msg정보를 가져와서
        # goal을 설정하게끔 서버에 비동기로 요청하는 
        # send_goal_async메서드를 사용하고 이를 호출한다.
        # 이때 반환되는 정보는 'Future' 객체이다.
        future = self.action_client.send_goal_async(goal_msg)
        return future
    
    # 앞서 main함수 구문에서 동작중인 future 객체를 가져와서
    # Goal 설정이 잘 되엇는지? 
    # Goal 목표를 완수하면 그 결과를 가져오는 함수를 구현한다.
    def goal_response_callback(self, future):
        goal_handle = future.result()
        # 골 설정 실페에 따른 예외처리 구문
        if not goal_handle.accepted:
            self.get_logger().info('Goal 설정실패')
            return
        self.get_logger().info('Goal 설정')

        # 골 목표 완수에 따른 결과값을 호출하는 구문
        result_future = goal_handle.get_result_async()
        result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        self.get_logger().info(
            f'act_cli-최종 이동거리: {result.result_dist:.3f}')


class ActionSrvNode(Node): #액션서버 기능 설계
    def __init__(self):
        super().__init__('DistTurtleActSrvNode')    # 노드명 기입

        self.action_srv = ActionServer(
            node=self,
            action_type=DistTurtle,
            action_name='act_dist_turtle',
            execute_callback = self.action_srv_callback
        )

        #좌표 저장은 편하게 Pose 메세지 타입 클래스를 상숙해서 만들자
        self.pre_pose = Pose()
        self.record_dist = 0 #이동 거리는 누적형태로 저장함

    def action_srv_callback(self, goal_handle):
        # 액션 서버 콜백 함수에서 다룰 액션 메세지 타입 객체화
        feedback_msg = DistTurtle.Feedback()
        result_msg = DistTurtle.Result()

        #설정된 골 정보를 가져옴
        goal = goal_handle.request.cmd_dist

        # cmd_turtle_msg의 발행주기를 관장하는 시간주기 변수 설정
        diff_time_sec = 1.0

        while goal > self.record_dist:
            # goal이 설정 -> 터틀봇 제어 Pub가 발행되게
            # ipc_queue.State를 False -> True로 변경
            ipc_queue.state = True

            #이동 거리 누적
            self.record_dist += self.clac_dist()
            #누적된 이동거리 값을 퍼를리시
            feedback_msg.record_dist = self.record_dist
            self.get_logger().info(
                f'act_srv-누적 이동거리: {feedback_msg.record_dist:.3f}')

            # 터틀봇이 현재까지 이동한 거리 정보 발행
            goal_handle.publish_feedback(feedback_msg)

            #제어메세지 발행 간격 정의
            time.sleep(diff_time_sec)

            # 터틀봇의 이동거리가 목표 이동거리에 근접하면 while문 종료
            if goal - feedback_msg.record_dist < 0.2 :
                break
        
        # 액션 서버의 상태를 SUCCEED로 변경
        goal_handle.succeed()
        # 액션 서버 상태가 SUCCEED이니 state를 제어명령 발행 중지로 변경
        ipc_queue.state = False

        # 액션 서버가 Succeed(완수)상태이니 result 메세지 출력
        result_msg.pos_x = ipc_queue.pos_x
        result_msg.pos_y = ipc_queue.pos_y
        result_msg.pos_theta = ipc_queue.pos_theta
        result_msg.result_dist = feedback_msg.record_dist

        self.get_logger().info(
            f'act_srv-좌표: [x: {result_msg.pos_x:.3f}, ' + 
            f'y: {result_msg.pos_y:.3f}, ' + 
            f'theta: {result_msg.pos_theta:.4f}]')

        return result_msg
    
    # 누적 이동거리 계산 함수
    def clac_dist(self):
        if ipc_queue.state == False: #목표 설정이 안된 경우
            self.pre_pose.x = ipc_queue.pos_x
            self.pre_pose.y = ipc_queue.pos_y
            return 0.0 #이동거리는 0으로 리턴함

        else : #목표 설정이 되서 터틀봇이 제어되고 있는 경우
            dist = math.sqrt((ipc_queue.pos_x - self.pre_pose.x)**2 + 
                             (ipc_queue.pos_y - self.pre_pose.y)**2)
            
            # 좌표 업데이트
            self.pre_pose.x = ipc_queue.pos_x
            self.pre_pose.y = ipc_queue.pos_y
            return dist


class PubSubNode(Node): #터틀봇의 제어, 위치/자세 정보 수집 노드설계
    def __init__(self):
        super().__init__('TurtleStateNode')
        
        # 퍼블리셔
        self.publisher = self.create_publisher(
            msg_type=Twist,
            topic='/turtle1/cmd_vel',
            qos_profile=10
        )
        # 섭스크라이버
        self.subscriber = self.create_subscription(
            msg_type=Pose,
            topic='/turtle1/pose',
            callback= self.callback_pose,
            qos_profile=10,
        )
        # 퍼블리셔를 일정 주기로 발행하기 위한 타이머 콜백함수
        self.timer_period = 1.0
        self.timer = self.create_timer(
            timer_period_sec=self.timer_period,
            callback=self.timer_callback
        )
    
    # 섭스크라이버의 콜백함수
    def callback_pose(self, msg):
        ipc_queue.pos_x = msg.x
        ipc_queue.pos_y = msg.y
        ipc_queue.pos_theta = msg.theta

    # 일정 주기로 퍼블리싱 하기 위한 타이머 콜백함
    def timer_callback(self):
        cmd_msg = Twist()
        if ipc_queue.state == True:
            #터틀봇 제어메세지(cmd_msg)에 데이터값 기입
            cmd_msg.linear.x = random.random() * 2
            cmd_msg.angular.z = random.random() * 2.5

            #터틀봇 제어 정보를 발행하여 터틀봇 제어
            self.publisher.publish(msg=cmd_msg)



def main(args=None):
    rp.init(args=args)

    #ROS2 스케줄링 관리 단위 : Executor을 멀티 프로세싱 모드로 객체화
    executor = MultiThreadedExecutor()

    action_srv_node = ActionSrvNode()
    action_cli_node = ActionCliNode()
    pub_and_sub_node= PubSubNode()

    # executor에 인스턴스화 한 노드항목 추가
    executor.add_node(action_srv_node)
    executor.add_node(action_cli_node)
    executor.add_node(pub_and_sub_node)

    try:
        # cmd창에서 goal에 대한 숫자를 입력하면 float로 환
        input_data = float(input("Enter goal distance: "))

        # 여기서 Feature는 `action_type.Result` 타입의 객체이나,
        # 따지고 보면 Service 클라이언트의 Req수행 후 반환되는 객체인
        # Future 객체와 동일한 구성의 객체를 반환한다.
        future = action_cli_node.send_goal(input_data)

        # 액션 클라이언트로 Goal 설정 정보를 전달해서 나중에 send_goal_async로
        # Goal 설정이 완료되면 goal_response_callback 함수를 호출 하게끔
        # 액션 클라이언트의 상태정보를 관리하는 future메서드 내
        # add_done_callback 메서드를 사용한다.
        future.add_done_callback(action_cli_node.goal_response_callback)

        executor.spin()

    finally:
        executor.shutdown()

        pub_and_sub_node.destroy_node()
        action_cli_node.destroy_node()
        action_srv_node.destroy_node()

        rp.shutdown()


if __name__ == '__main__()':
    main()
profile
자율차 공부중

0개의 댓글

관련 채용 정보