[ROS2] 파이썬 패키지 설계 및 프로그래밍(토픽)

HY K·2024년 9월 20일
0

ROS2

목록 보기
12/18

이번에는 기초 프로그래밍을 응용해서, 토픽 / 서비스 / 액션 프로그래밍을 모두 이용하는 패키지를 설계하고 프로그래밍 해보자.
참고한 링크는 다음과 같다.
https://cafe.naver.com/openrt/24637
https://cafe.naver.com/openrt/24644
https://cafe.naver.com/openrt/24662
https://cafe.naver.com/openrt/24666
https://cafe.naver.com/openrt/24690
https://cafe.naver.com/openrt/24734


패키지 설계


위 이미지대로 작동하는 패키지를 한번 만들어볼 것이다.

  1. argument node
    -> arithmetic_argument 토픽으로 현재 시간, 변수 a,b 퍼블리시
  2. calculator node
    -> arithmetic_argument 토픽이 생성한 시간과 변수를 서브스크라이브
  3. operator node
    -> arithmetic_operator 서비스를 통해 calculator 노드에게 연산자를 요청
  4. calculator node
    -> 저장하고 잇는 변수 a와 b를 서비스 요청으로 받은 연산자를 이용하여 연산하고, 이후 결과값을 응답으로 보낸다
    -> 액션 목표값을 전달받은 이후부터는 연산 결과값을 누적하고 액션 피드백으로 연산식을 보낸다. 누적된 연산 결과값이 전달받은 목표값보다 커지면 액션 결과값으로 누적된 연산 결과값과 연산식 모두를 보낸다.
  5. checker node
    -> 연산 결과값의 누적 한계치를 액션 목표값으로 전달

전체 패키지 구조와 코드
https://github.com/robotpilot/ros2-seminar-examples/tree/main/topic_service_action_rclpy_example
이곳을 참조하도록 하자.

package 생성

$ cd robot_ws/src
$ ros2 pkg create topic_service_action_rclpy_example --build-type ament_python --dependencies rclpy std_msgs

package.xml

<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
  <name>topic_service_action_rclpy_example</name>
  <version>0.1.0</version>
  <description>TODO: Package description</description>
  <maintainer email="kimhoyun@todo.todo">kimhoyun</maintainer>
  <license>TODO: License declaration</license>

  <depend>rclpy</depend>
  <depend>std_msgs</depend>
  <depend>msg_srv_action_interface_example</depend>

  <test_depend>ament_copyright</test_depend>
  <test_depend>ament_flake8</test_depend>
  <test_depend>ament_pep257</test_depend>
  <test_depend>python3-pytest</test_depend>

  <export>
    <build_type>ament_python</build_type>
  </export>
</package>

setup.py

#!/usr/bin/env python3
# 쉐뱅을 통해 파이썬 인터프리터 지정

from setuptools import find_packages, setup
from setuptools import setup

import glob # 파일 경로를 통해 특정 확장자 파일을 찾을 때 유용
import os # 운영체제와 상호작용하기 위한 함수 제공

package_name = 'topic_service_action_rclpy_example'
share_dir = 'share/' + package_name

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
        (share_dir + '/launch', glob.glob(os.path.join('launch', '*.launch.py'))),
        (share_dir + '/param', glob.glob(os.path.join('param', '*.yaml'))),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='kimhoyun',
    maintainer_email='kimhoyun@todo.todo',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'argument = topic_service_action_rclpy_example.arithmetic.argument:main',
            'operator = topic_service_action_rclpy_example.arithmetic.operator:main',
            'calculator = topic_service_action_rclpy_example.calculator.main:main',
            'checker = topic_service_action_rclpy_example.checker.main:main',
        ],
    },
)

파이썬 토픽 프로그래밍

토픽은 단방향, 비동기식 메시지 송수신 방식으로, 토픽을 퍼블리시 하는 퍼블리셔와 서브스크라이브 하는 서브스크라이버로 구성된다.

전체 코드는 위에 있는 깃헙 링크 내에 들어가면 있으니, 그것을 보면 될 것이고 여기서는 토픽에 관련된 것만 살펴보겠다.

  • 퍼블리셔 코드
  • topic_service_action_rclpy_example/topic_service_action_rclpy_example/arithmetic/argument.py
import random

from msg_srv_action_interface_example.msg import ArithmeticArgument
from rcl_interfaces.msg import SetParametersResult
import rclpy
from rclpy.node import Node
from rclpy.parameter import Parameter
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy


class Argument(Node):

    def __init__(self):
        super().__init__('argument')
	(중략)

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

        self.arithmetic_argument_publisher = self.create_publisher(
            ArithmeticArgument,
            'arithmetic_argument',
            QOS_RKL10V)

        self.timer = self.create_timer(1.0, self.publish_random_arithmetic_arguments)
        
        (중략)
        
    def publish_random_arithmetic_arguments(self):
        msg = ArithmeticArgument()
        msg.stamp = self.get_clock().now().to_msg()
        msg.argument_a = float(random.randint(self.min_random_num, self.max_random_num))
        msg.argument_b = float(random.randint(self.min_random_num, self.max_random_num))
        self.arithmetic_argument_publisher.publish(msg)
        self.get_logger().info('Published argument a: {0}'.format(msg.argument_a))
        self.get_logger().info('Published argument b: {0}'.format(msg.argument_b))

QoSProfile 클래스를 통해서 토픽 퍼블리셔가 사용할 QoS 설정을 하는 것을 볼 수 있다. 또한 퍼블리셔를 선언해 어떤 인터페이스를 사용할 것인지, 어떤 이름으로 발행할 것인지, 그리고 콜백 함수는 얼마마다 호출할 것인지 등도 나타내고 있다.

그리고 아래 ** def publish_random_arithmetic_argument(self) 콜백함수를 통해서 현재 시간과 랜덤 실수로 정해지는 변수를 기입해 토픽으로 발행함을 알 수 있다.

  • 서브스크라이버 코드
  • topic_service_action_rclpy_example/topic_service_action_rclpy_example/calculatopr/calculator.py
class Calculator(Node):

    def __init__(self):
        super().__init__('calculator')
        self.argument_a = 0.0
        self.argument_b = 0.0
        self.callback_group = ReentrantCallbackGroup()

        (일부 코드 생략)

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

        self.arithmetic_argument_subscriber = self.create_subscription(
            ArithmeticArgument,
            'arithmetic_argument',
            self.get_arithmetic_argument,
            QOS_RKL10V,
            callback_group=self.callback_group)

    def get_arithmetic_argument(self, msg):
        self.argument_a = msg.argument_a
        self.argument_b = msg.argument_b
        self.get_logger().info('Subscribed at: {0}'.format(msg.stamp))
        self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
        self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))

전체 코드 중에서 서브스크라이버에 해당하는 부분만 간추려놓았다.

QoS 설정을 똑같이 맞춰주고, 서브스크라이버를 선언하고 동일한 인터페이스, 동일한 토픽 이름 그리고 서브스크라이브 콜백 함수와 QoS 프로필을 선언한 것을 알 수 있다.

그리고 콜백함수에서는 변수들을 저장하고, 로깅을 통해 토픽을 받은 시간과 변수들을 화면에 출력하도록 지시하였다.

위 퍼블리셔와 서브스크라이버는 다음과 같이 실행이 가능하다.

$ ros2 run topic_service_action_rclpy_example calculator
$ ros2 run topic_service_action_rclpy_example argument

이 부분은

  • topic_service_action_rclpy_example/setup.py

위 파일에 위치한 entry_points를 보면 실행 노드로서 알 수 있다.

    entry_points={
        'console_scripts': [
            'argument = topic_service_action_rclpy_example.arithmetic.argument:main',
            'operator = topic_service_action_rclpy_example.arithmetic.operator:main',
            'calculator = topic_service_action_rclpy_example.calculator.main:main',
            'checker = topic_service_action_rclpy_example.checker.main:main',
        ],
    },

한가지 특이할만한 점에라면, calculator 노드에 병렬 스레드를 설정했다는 것이다.

import rclpy
from rclpy.executors import MultiThreadedExecutor

from topic_service_action_rclpy_example.calculator.calculator import Calculator


def main(args=None):
    rclpy.init(args=args)
    try:
        calculator = Calculator()
        executor = MultiThreadedExecutor(num_threads=4)
        executor.add_node(calculator)
        try:
            executor.spin()
        except KeyboardInterrupt:
            calculator.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            executor.shutdown()
            calculator.arithmetic_action_server.destroy()
            calculator.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()

이 부분은 4개의 스레드를 사용하는 Executor를 생성함으로써 토픽 퍼블리셔의 콜백 함수, 서비스 서버의 콜백 삼후, 액션 서버의 콜백 함수, 특정 타이머의 콜백 함수 등을 병렬 처리하도록 설정한 것이라고 보면된다.


profile
로봇, 드론, SLAM, 제어 공학 초보

0개의 댓글