본 블로그 포스팅은 https://www.youtube.com/@pinklab_studio/playlists
에서 현재 강의 중에 있는
ROS2 무작정 따라하기강의의 내용을 필자가 다시 복기하여 기록하는 내용에 관한 것이다.
본 포스팅에서 다룰 내용은 ROS2 무작정 따라하기 강좌의 3-(8~10)
일정한 거리를 이동하는 액션 서버구현 이나, 그간 작성해온 응용 포스트를 살펴본다면
민형기 교수님이 강의하시는 강의내용을 응용 파트에 맞춰 필자가 좀 더 응용된 실습을 진행하고 있다.
따라서 이번 포스트도 실습 내용의 난이도를 좀 더 높였으며, 그 구성은 아래와 같다.
노드는 MultiThreadedExecutor
모드로
action_cli_node : 액션 클라이언트 노드로, act_dist_turtle명으로 구동중인 액션 서버에 접속하여 액션 목표(Goal)을 전달해 작업 목표를 설정하고, 작업 목표가 완료됫을 시 액션 결과(Resutl) 항목 중 result_dist
항목을 추출해 결과 확인
action_srv_node : act_dist_turtle명으로 액션 서버를 구동해 액션 목표(Goal)가 설정되면 설정된 액션 목표를 도달할 수 있도록 pub_and_sub_node에 터틀봇 제어 명령을 발행하라 지시
발행 지시방법은 IPC통신
: ipc_queue.state항목을 True
로 전환
액션 목표를 달성하기 위해 꾸준하게 pub_and_sub_node노드가 수집하는 터틀봇 위치/자세 정보를 기반으로 터틀봇의 이동거리 계산
위치/자세정보 수집방법은 : IPC통신
: ipc_queue.(pos_x, pos_y)에 기입된 정보를 수집하여 이동거리 계산
액션 목표를 달성하면 ipc_queue.state를 False
로 전환하여 터틀봇 제어 명령 발행을 중단, 목표를 달성한 순간의 터틀봇 위치/자세 정보 출력
액션 결과(Resutl) 항목 중 pos_x
, pos_y
, pos_theta
항목 출력
pub_and_sub_node : 터틀봇 제어 및 위치/자세정보 수집을 위한 노드로
퍼블리셔 /turtle1/cmd_vel를 통해 터틀봇 제어
섭스크라이버 /turtle1/pose를 통해 터틀봇 위치/자세 정보 확인 ipc_queue.(pos_x, pos_y)로 위치 정보 전달
노드는 3개, 메세지는 액션(사실상 3종), 토픽(2종)을 다루기에 하나의 패키지 - 노드 별 파이썬파일로 만드는 것이 더 합리적이긴 하나,
MultiThreadedExecutor
를 극한까지 사용하기 위해 프로세스간 통신 방법인IPC(Inter Process Communication)
까지 사용해보자
0) 전역 설정 부
import rclpy as rp
# 멀티스레딩 기능을 활성화 하기 위한 MultiThreadedExecutor
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
# Action기능을 쓰려면 별도의 ActionServer, ActionClient
# 라이브러리를 import 해야함
from rclpy.action import ActionServer
from rclpy.action import ActionClient
# 터틀봇의 위치/자세정보 취득을 위한 메세지
from turtlesim.msg import Pose
# 터틀봇의 제어명령 수행을 위한 메세지
from geometry_msgs.msg import Twist
# 커스텀 액션 메세지
from my_trd_pkg_msgs.action import DistTurtle
# 멀티 프로세서간 통신을 위한 Queue 라이브러리
from multiprocessing import Queue
import random, time, math
# Queue를 전역 변수로 선언하여 노드 간 공유
ipc_queue = Queue()
ipc_queue.state = False
ipc_queue.pos_x = 0.0
ipc_queue.pos_y = 0.0
ipc_queue.pos_theta = 0.0
앞서 IPC통신을 구현한다고 막 어렵게 썰을 풀었지만
사실상 구현하는 방식은 전역변수 형식으로 코드를 구현했다.
근데 그 전역변수 라는 것이 프로세스(노드)간 왓다갓다가 가능한 데이터 형식인
multiprocessing
라이브러리의 Queue
클래스 자료형으로 전역변수를 선언한 것 뿐이다.
따라서 어렵게 생각할 필요성이 없다.
1) main 함수 부
def main(args=None):
rp.init(args=args)
#ROS2 스케줄링 관리 단위 : Executor을 멀티 프로세싱 모드로 객체화
executor = MultiThreadedExecutor()
action_srv_node = ActionSrvNode()
action_cli_node = ActionCliNode()
pub_and_sub_node= PubSubNode()
# executor에 인스턴스화 한 노드항목 추가
executor.add_node(action_srv_node)
executor.add_node(action_cli_node)
executor.add_node(pub_and_sub_node)
try:
# cmd창에서 goal에 대한 숫자를 입력하면 float로 환
input_data = float(input("Enter goal distance: "))
# 여기서 Feature는 `action_type.Result` 타입의 객체이나,
# 따지고 보면 Service 클라이언트의 Req수행 후 반환되는 객체인
# Future 객체와 동일한 구성의 객체를 반환한다.
future = action_cli_node.send_goal(input_data)
# 액션 클라이언트로 Goal 설정 정보를 전달해서 나중에 send_goal_async로
# Goal 설정이 완료되면 goal_response_callback 함수를 호출 하게끔
# 액션 클라이언트의 상태정보를 관리하는 future메서드 내
# add_done_callback 메서드를 사용한다.
future.add_done_callback(action_cli_node.goal_response_callback)
executor.spin()
finally:
executor.shutdown()
pub_and_sub_node.destroy_node()
action_cli_node.destroy_node()
action_srv_node.destroy_node()
rp.shutdown()
if __name__ == '__main__()':
main()
다른것은 크게 어려움이 없는데
future
부분이 나오면서 살짝 코드가 난해해진다.
이 부분은 Action client 예제코드인
https://docs.ros.org/en/eloquent/Tutorials/Actions/Writing-a-Py-Action-Server-Client.html
이 부분을 참조하면서 보는게 조금 더 편하다
위 Action Client 예제의 사례를 가져와서
클라이언트의 Goal
이 설정되고, 또 설정된 Goal
을 달성햇을 시 각각의 콜백함수를 호출하는 코드를 응용했다...
이렇게 보면 된다.
2) 액션 클라이언트 설계 부
class ActionCliNode(Node): #액션 클라이언트 기능 설계
def __init__(self):
super().__init__('DistTurtleActCliNode')
self.action_client = ActionClient(
node=self,
action_type=DistTurtle,
action_name='act_dist_turtle'
)
# 사용자가 커맨드창에서 입력한 숫자는 `cmd_dist`로 받는다.
def send_goal(self, cmd_dist):
goal_msg = DistTurtle.Goal()
goal_msg.cmd_dist = cmd_dist
#서버가 구동될때까지 기다리기
self.action_client.wait_for_server()
# 액션 클라이언트가 설정된 goal_msg정보를 가져와서
# goal을 설정하게끔 서버에 비동기로 요청하는
# send_goal_async메서드를 사용하고 이를 호출한다.
# 이때 반환되는 정보는 'Future' 객체이다.
future = self.action_client.send_goal_async(goal_msg)
return future
# 앞서 main함수 구문에서 동작중인 future 객체를 가져와서
# Goal 설정이 잘 되엇는지?
# Goal 목표를 완수하면 그 결과를 가져오는 함수를 구현한다.
def goal_response_callback(self, future):
goal_handle = future.result()
# 골 설정 실페에 따른 예외처리 구문
if not goal_handle.accepted:
self.get_logger().info('Goal 설정실패')
return
self.get_logger().info('Goal 설정')
# 골 목표 완수에 따른 결과값을 호출하는 구문
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(
f'act_cli-최종 이동거리: {result.result_dist:.3f}')
액션 클라이언트 설계 부분에서는
send_goal
함수에서 설정된 Goal
값에 따라서 액션 서버가 목표를 설정하도록 send_goal_async
그 이후 목표를 달성했으면 달성 결과에 대한 Result
를 요청하는 get_result_async
두개의 비동기 호출 메서드를 설계했다.
3) 액션 서버 설계 부
class ActionSrvNode(Node): #액션서버 기능 설계
def __init__(self):
super().__init__('DistTurtleActSrvNode') # 노드명 기입
self.action_srv = ActionServer(
node=self,
action_type=DistTurtle,
action_name='act_dist_turtle',
execute_callback = self.action_srv_callback
)
#좌표 저장은 편하게 Pose 메세지 타입 클래스를 상숙해서 만들자
self.pre_pose = Pose()
self.record_dist = 0 #이동 거리는 누적형태로 저장함
def action_srv_callback(self, goal_handle):
# 액션 서버 콜백 함수에서 다룰 액션 메세지 타입 객체화
feedback_msg = DistTurtle.Feedback()
result_msg = DistTurtle.Result()
#설정된 골 정보를 가져옴
goal = goal_handle.request.cmd_dist
# cmd_turtle_msg의 발행주기를 관장하는 시간주기 변수 설정
diff_time_sec = 1.0
while goal > self.record_dist:
# goal이 설정 -> 터틀봇 제어 Pub가 발행되게
# ipc_queue.State를 False -> True로 변경
ipc_queue.state = True
#이동 거리 누적
self.record_dist += self.clac_dist()
#누적된 이동거리 값을 퍼를리시
feedback_msg.record_dist = self.record_dist
self.get_logger().info(
f'act_srv-누적 이동거리: {feedback_msg.record_dist:.3f}')
# 터틀봇이 현재까지 이동한 거리 정보 발행
goal_handle.publish_feedback(feedback_msg)
#제어메세지 발행 간격 정의
time.sleep(diff_time_sec)
# 터틀봇의 이동거리가 목표 이동거리에 근접하면 while문 종료
if goal - feedback_msg.record_dist < 0.2 :
break
# 액션 서버의 상태를 SUCCEED로 변경
goal_handle.succeed()
# 액션 서버 상태가 SUCCEED이니 state를 제어명령 발행 중지로 변경
ipc_queue.state = False
# 액션 서버가 Succeed(완수)상태이니 result 메세지 출력
result_msg.pos_x = ipc_queue.pos_x
result_msg.pos_y = ipc_queue.pos_y
result_msg.pos_theta = ipc_queue.pos_theta
result_msg.result_dist = feedback_msg.record_dist
self.get_logger().info(
f'act_srv-좌표: [x: {result_msg.pos_x:.3f}, ' +
f'y: {result_msg.pos_y:.3f}, ' +
f'theta: {result_msg.pos_theta:.4f}]')
return result_msg
# 누적 이동거리 계산 함수
def clac_dist(self):
if ipc_queue.state == False: #목표 설정이 안된 경우
self.pre_pose.x = ipc_queue.pos_x
self.pre_pose.y = ipc_queue.pos_y
return 0.0 #이동거리는 0으로 리턴함
else : #목표 설정이 되서 터틀봇이 제어되고 있는 경우
dist = math.sqrt((ipc_queue.pos_x - self.pre_pose.x)**2 +
(ipc_queue.pos_y - self.pre_pose.y)**2)
# 좌표 업데이트
self.pre_pose.x = ipc_queue.pos_x
self.pre_pose.y = ipc_queue.pos_y
return dist
액션 서버는 Goal
이 설정되면 그때부터 동작되는
execute_callback
함수인 action_srv_callback에서
터틀봇 제어 퍼블리셔가 구동되게끔 : ipc_queue.state = True
후 목표를 달성하기 위한 매 누적이동거리 계산 : clac_dist()
목표달성 조건 설정 : goal - feedback_msg.record_dist < 0.2
달성 후 퍼블리싱 중단 : ipc_queue.state = False
목표 달성 후 결과 출력 등을 수행한다.
4) 퍼블리셔 + 섭스크라이버 노드 부
class PubSubNode(Node): #터틀봇의 제어, 위치/자세 정보 수집 노드설계
def __init__(self):
super().__init__('TurtleStateNode')
# 퍼블리셔
self.publisher = self.create_publisher(
msg_type=Twist,
topic='/turtle1/cmd_vel',
qos_profile=10
)
# 섭스크라이버
self.subscriber = self.create_subscription(
msg_type=Pose,
topic='/turtle1/pose',
callback= self.callback_pose,
qos_profile=10,
)
# 퍼블리셔를 일정 주기로 발행하기 위한 타이머 콜백함수
self.timer_period = 1.0
self.timer = self.create_timer(
timer_period_sec=self.timer_period,
callback=self.timer_callback
)
# 섭스크라이버의 콜백함수
def callback_pose(self, msg):
ipc_queue.pos_x = msg.x
ipc_queue.pos_y = msg.y
ipc_queue.pos_theta = msg.theta
# 일정 주기로 퍼블리싱 하기 위한 타이머 콜백함
def timer_callback(self):
cmd_msg = Twist()
if ipc_queue.state == True:
#터틀봇 제어메세지(cmd_msg)에 데이터값 기입
cmd_msg.linear.x = random.random() * 2
cmd_msg.angular.z = random.random() * 2.5
#터틀봇 제어 정보를 발행하여 터틀봇 제어
self.publisher.publish(msg=cmd_msg)
이 부분은 토픽 퍼블리셔, 토픽 섭스크라이버를 함께 하나의 노드에 구현한 것이기에 그다지 어려울 것은 없고...
매 수신받은 터틀봇 위치/자세 정보를
전역 Queue에 넘겨주는 IPC
통신이랑
ipc_queue.state
조건에 따라 퍼블리싱 구동/비구동 의 코드만 추가되었다.
5) setup.py에 신규노드(파이썬 파일) 등록
마지막으로는 setup.py entry_point
에 'my_dist_turtle_node = my_trd_pkg.my_action_srv_cli:main'
를 쉼표와 함께 등록해주자
그 이후 패키지 빌드 및 등록 수행하면 된다.
$ colcon build --packages-select my_trd_pkg
$ r2pkgsetup
1번 bash : 작성한 노드 실행
$ ros2 run my_trd_pkg my_dist_turtle_node
2번 bash : 터틀심 노드 실행
$ humble
$ ros2 run turtlesim turtlesim_node
qrt_graph
를 확인해보면 설계한 노드가 각각 잘 구현된 것을 확인할 수 있다.
따로 Action
메세지를 echo
해서 확인하지는 않았지만
각 데이터 정보를 get_logger
메서드로 출력되게 만들었으니
동작은 다 잘 된다.. 라고 보면 된다.
혹시 모르니 전체 코드를 아래 첨부하겠다.
import rclpy as rp
# 멀티스레딩 기능을 활성화 하기 위한 MultiThreadedExecutor
from rclpy.executors import MultiThreadedExecutor
from rclpy.node import Node
# Action기능을 쓰려면 별도의 ActionServer, ActionClient
# 라이브러리를 import 해야함
from rclpy.action import ActionServer
from rclpy.action import ActionClient
# 터틀봇의 위치/자세정보 취득을 위한 메세지
from turtlesim.msg import Pose
# 터틀봇의 제어명령 수행을 위한 메세지
from geometry_msgs.msg import Twist
# 커스텀 액션 메세지
from my_trd_pkg_msgs.action import DistTurtle
# 멀티 프로세서간 통신을 위한 Queue 라이브러리
from multiprocessing import Queue
import random, time, math
# Queue를 전역 변수로 선언하여 노드 간 공유
ipc_queue = Queue()
ipc_queue.state = False
ipc_queue.pos_x = 0.0
ipc_queue.pos_y = 0.0
ipc_queue.pos_theta = 0.0
class ActionCliNode(Node): #액션 클라이언트 기능 설계
def __init__(self):
super().__init__('DistTurtleActCliNode')
self.action_client = ActionClient(
node=self,
action_type=DistTurtle,
action_name='act_dist_turtle'
)
# 사용자가 커맨드창에서 입력한 숫자는 `cmd_dist`로 받는다.
def send_goal(self, cmd_dist):
goal_msg = DistTurtle.Goal()
goal_msg.cmd_dist = cmd_dist
#서버가 구동될때까지 기다리기
self.action_client.wait_for_server()
# 액션 클라이언트가 설정된 goal_msg정보를 가져와서
# goal을 설정하게끔 서버에 비동기로 요청하는
# send_goal_async메서드를 사용하고 이를 호출한다.
# 이때 반환되는 정보는 'Future' 객체이다.
future = self.action_client.send_goal_async(goal_msg)
return future
# 앞서 main함수 구문에서 동작중인 future 객체를 가져와서
# Goal 설정이 잘 되엇는지?
# Goal 목표를 완수하면 그 결과를 가져오는 함수를 구현한다.
def goal_response_callback(self, future):
goal_handle = future.result()
# 골 설정 실페에 따른 예외처리 구문
if not goal_handle.accepted:
self.get_logger().info('Goal 설정실패')
return
self.get_logger().info('Goal 설정')
# 골 목표 완수에 따른 결과값을 호출하는 구문
result_future = goal_handle.get_result_async()
result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
self.get_logger().info(
f'act_cli-최종 이동거리: {result.result_dist:.3f}')
class ActionSrvNode(Node): #액션서버 기능 설계
def __init__(self):
super().__init__('DistTurtleActSrvNode') # 노드명 기입
self.action_srv = ActionServer(
node=self,
action_type=DistTurtle,
action_name='act_dist_turtle',
execute_callback = self.action_srv_callback
)
#좌표 저장은 편하게 Pose 메세지 타입 클래스를 상숙해서 만들자
self.pre_pose = Pose()
self.record_dist = 0 #이동 거리는 누적형태로 저장함
def action_srv_callback(self, goal_handle):
# 액션 서버 콜백 함수에서 다룰 액션 메세지 타입 객체화
feedback_msg = DistTurtle.Feedback()
result_msg = DistTurtle.Result()
#설정된 골 정보를 가져옴
goal = goal_handle.request.cmd_dist
# cmd_turtle_msg의 발행주기를 관장하는 시간주기 변수 설정
diff_time_sec = 1.0
while goal > self.record_dist:
# goal이 설정 -> 터틀봇 제어 Pub가 발행되게
# ipc_queue.State를 False -> True로 변경
ipc_queue.state = True
#이동 거리 누적
self.record_dist += self.clac_dist()
#누적된 이동거리 값을 퍼를리시
feedback_msg.record_dist = self.record_dist
self.get_logger().info(
f'act_srv-누적 이동거리: {feedback_msg.record_dist:.3f}')
# 터틀봇이 현재까지 이동한 거리 정보 발행
goal_handle.publish_feedback(feedback_msg)
#제어메세지 발행 간격 정의
time.sleep(diff_time_sec)
# 터틀봇의 이동거리가 목표 이동거리에 근접하면 while문 종료
if goal - feedback_msg.record_dist < 0.2 :
break
# 액션 서버의 상태를 SUCCEED로 변경
goal_handle.succeed()
# 액션 서버 상태가 SUCCEED이니 state를 제어명령 발행 중지로 변경
ipc_queue.state = False
# 액션 서버가 Succeed(완수)상태이니 result 메세지 출력
result_msg.pos_x = ipc_queue.pos_x
result_msg.pos_y = ipc_queue.pos_y
result_msg.pos_theta = ipc_queue.pos_theta
result_msg.result_dist = feedback_msg.record_dist
self.get_logger().info(
f'act_srv-좌표: [x: {result_msg.pos_x:.3f}, ' +
f'y: {result_msg.pos_y:.3f}, ' +
f'theta: {result_msg.pos_theta:.4f}]')
return result_msg
# 누적 이동거리 계산 함수
def clac_dist(self):
if ipc_queue.state == False: #목표 설정이 안된 경우
self.pre_pose.x = ipc_queue.pos_x
self.pre_pose.y = ipc_queue.pos_y
return 0.0 #이동거리는 0으로 리턴함
else : #목표 설정이 되서 터틀봇이 제어되고 있는 경우
dist = math.sqrt((ipc_queue.pos_x - self.pre_pose.x)**2 +
(ipc_queue.pos_y - self.pre_pose.y)**2)
# 좌표 업데이트
self.pre_pose.x = ipc_queue.pos_x
self.pre_pose.y = ipc_queue.pos_y
return dist
class PubSubNode(Node): #터틀봇의 제어, 위치/자세 정보 수집 노드설계
def __init__(self):
super().__init__('TurtleStateNode')
# 퍼블리셔
self.publisher = self.create_publisher(
msg_type=Twist,
topic='/turtle1/cmd_vel',
qos_profile=10
)
# 섭스크라이버
self.subscriber = self.create_subscription(
msg_type=Pose,
topic='/turtle1/pose',
callback= self.callback_pose,
qos_profile=10,
)
# 퍼블리셔를 일정 주기로 발행하기 위한 타이머 콜백함수
self.timer_period = 1.0
self.timer = self.create_timer(
timer_period_sec=self.timer_period,
callback=self.timer_callback
)
# 섭스크라이버의 콜백함수
def callback_pose(self, msg):
ipc_queue.pos_x = msg.x
ipc_queue.pos_y = msg.y
ipc_queue.pos_theta = msg.theta
# 일정 주기로 퍼블리싱 하기 위한 타이머 콜백함
def timer_callback(self):
cmd_msg = Twist()
if ipc_queue.state == True:
#터틀봇 제어메세지(cmd_msg)에 데이터값 기입
cmd_msg.linear.x = random.random() * 2
cmd_msg.angular.z = random.random() * 2.5
#터틀봇 제어 정보를 발행하여 터틀봇 제어
self.publisher.publish(msg=cmd_msg)
def main(args=None):
rp.init(args=args)
#ROS2 스케줄링 관리 단위 : Executor을 멀티 프로세싱 모드로 객체화
executor = MultiThreadedExecutor()
action_srv_node = ActionSrvNode()
action_cli_node = ActionCliNode()
pub_and_sub_node= PubSubNode()
# executor에 인스턴스화 한 노드항목 추가
executor.add_node(action_srv_node)
executor.add_node(action_cli_node)
executor.add_node(pub_and_sub_node)
try:
# cmd창에서 goal에 대한 숫자를 입력하면 float로 환
input_data = float(input("Enter goal distance: "))
# 여기서 Feature는 `action_type.Result` 타입의 객체이나,
# 따지고 보면 Service 클라이언트의 Req수행 후 반환되는 객체인
# Future 객체와 동일한 구성의 객체를 반환한다.
future = action_cli_node.send_goal(input_data)
# 액션 클라이언트로 Goal 설정 정보를 전달해서 나중에 send_goal_async로
# Goal 설정이 완료되면 goal_response_callback 함수를 호출 하게끔
# 액션 클라이언트의 상태정보를 관리하는 future메서드 내
# add_done_callback 메서드를 사용한다.
future.add_done_callback(action_cli_node.goal_response_callback)
executor.spin()
finally:
executor.shutdown()
pub_and_sub_node.destroy_node()
action_cli_node.destroy_node()
action_srv_node.destroy_node()
rp.shutdown()
if __name__ == '__main__()':
main()