토픽 통신 개념
토픽 통신 예시
$ rqt_graph
rqt_graph 시각화 예시
유용한 명령어
$ ros2 node list: 현재 동작하고 있는 node의 목록$ ros2 topic list: 현재 동작하고 있는 topic의 목록$ ros2 service list: 현재 동작하고 있는 service의 목록$ ros2 action list: 현재 동작하고 있는 action의 목록토픽 관련 유용한 명령어
$ ros2 topic info /tutle1/cmd_vel: "/tutle1/cmd_vel"이라는 토픽의 정보를 출력

$ ros2 topic echo /tutle1/cmd_vel: "/tutle1/cmd_vel"이라는 토픽에 현재 담긴 데이터를 실시간으로 출력

$ ros2 topic pub --once /turtle1/cmd_vel geometry_msg/msg/Twist “linear: {x: 2.0, y: 0.0, z: 0.0}, angular: {x: 0.0, y: 0.0, z: 0.0}“ : "geometry_msg/msg/Twist"라는 메세지 타입으로 만들어진 "/tutle1/cmd_vel"이라는 토픽에 "x: 2.0"이라는 값을 담아서 publish함. 굳이 소스코드를 작성하지 않고 간단하게 command line에서 특정 토픽을 publish하는 방법으로, 노드의 동작을 간단한게 테스트할 때 유용함
- ros2 topic pub <옵션> <토픽이름> <메세지타입> <입력할 데이터>의 형태

$ ros2 topic hz <토픽이름>: 해당 토픽의 발행 주기(Hz)를 출력
$ ros2 topic delay <토픽이름>: 해당 토픽의 네트워크 지연 시간을 출력
$ ros2 topic bw <토픽이름>: 해당 토픽의 bandwidth를 출력
우선 실습을 진행할 작업공간(workspace)를 생성함
작업 공간은 어떤 프로젝트라고 생각하면 됨
자율주행차 만들기 프로젝트(작업공간) 내에 여러 패키지(인지, 판단, 제어)가 존재하며, 각 패키지를 구성하는 세부 소프트웨어 모듈인 노드(SLAM, 객체 검출, RRT 탐색 등)가 존재
아래 명령어를 실행
$ mkdir my_ws
$ cd my_ws
$ mkdir src
$ cd src
ros2 pkg create <만들 패키지 이름> --build-type <빌드 타입> --dependencies <의존성 목록>$ ros2 pkg create my_rclpy_package --build-type ament_python --dependencies rclpy std_msgs
패키지 생성 결과로 여러 파일들이 생성됨
$ cd my_rclpy_package, $ls 명령어를 통해 위 명령어를 통해 생성된 파일과 디렉토리를 볼 수 있음
"my_rclpy_package/": 패키지 이름과 같은 이름의 디렉토리가 생성되며 그 안에는 "init.py"파일이 하나 있음. 보통 이 디렉토리 안에 노드를 정의한 파이썬 파일들을 모아놓음
"package.xml": 패키지 설정파일. 추후 의존성 등을 추가할 때 이 파일에 잘 추가해야 빌드가 성공적으로 이루어짐
"setup.py": 설정파일. 마찬가지로 의존성 등을 잘 추가해야하며, 특히 "entry_point"라는 필드는 나중에 노드를 실행시키는 ros2 run 또는 ros2 launch 명령어가 어떤 파일의 어떤 함수를 실행시킬지 명시하는 곳이므로 주의해서 작성해야함
"setup.cfg": 빌드 후 실행 파일의 설치 경로
ROS에서 패키지 및 파일 이름 등은 snake_case를 class는 CamelCase를 따라 이름을 작성하며, msg, srv, action 파일 등은 빌드 후 class가 되므로 CamelCase로 이름을 작성
$ cd ~/my_ws/src/my_rclpy_package/my_rclpy_package
$ code helloworld_publisher.py
import rclpy #ROS python 기본 라이브러리
from rclpy.node import Node #rclpy내 Node 클래스를 import. Node clas 내에는 node를 정의하고 사용하기 위한 유용한 메소드들이 구현되어 있어 이를 상속받아 사용
from rclpy.qos import QoSProfile #네트워크 관련 내용
from std_msgs.msg import String # std_msgs 내의 String 메세지 타입을 사용
class HelloworldPublisher(Node): #Node 클래스를 상속
def __init__(self):
super().__init__('helloworld_publisher') # node 이름 정의
qos_profile = QoSProfile(depth=10) # 네트워크 설정. 네트워크가 불안정할 경우 데이터를 버리지 말고 10개까지 버퍼에 저장
self.helloworld_publisher = self.create_publisher(String, 'helloworld', qos_profile) # publisher 선언. 입력인자로 (<메세지타입>, <토픽이름>, <네트워크 설정>)을 입력
self.timer = self.create_timer(1, self.publish_helloworld_msg) # 타이머. 1초마다 self.publish_helloworld_msg 함수가 실행됨
self.count = 0
def publish_helloworld_msg(self): # 토픽에 데이터를 담고, 실제 publish하는 함수
msg = String() #메세지 타입을 선언
msg.data = 'Hello World: {0}'.format(self.count) #메세지 타입이 string임으로 문자열을 msg의 data 필드에 입력
self.helloworld_publisher.publish(msg) #토픽을 publish
self.get_logger().info('Published message: {0}'.format(msg.data)) #publish한 데이터를 확인할 수 있도록 출력
self.count += 1
def main(args=None): # 메인함수. 추후 setup.py의 entry_points 필드에서 노드 실행시 메인함수가 실행되도록 설정할 예정
rclpy.init(args=args) #초기화
node = HelloworldPublisher() #위에서 정의한 publisher class 선언
try:
rclpy.spin(node) #publisher는 끝나지 말고 데이터를 계속 발행해줘야하므로, 무한루프에 넣음
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)') #KeyboardInterrupt발생시 해당 에러 메세지를 출력
finally:
node.destroy_node() # 종료
rclpy.shutdown() # 종료
if __name__ == '__main__':
main()
$ code helloworld_subscriber.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
class HelloworldSubscriber(Node):
def __init__(self):
super().__init__('Helloworld_subscriber')
qos_profile = QoSProfile(depth=10)
self.helloworld_subscriber = self.create_subscription(
String,
'helloworld',
self.subscribe_topic_message,
qos_profile) # subscriber 선언. (<수신할 데이터 타입>, <수신할 토픽이름>, <콜백함수>, <네트워크설정>)을 입력. 콜백함수는 subscriber가 토픽을 수신할 때마다 실행되는 함수임
def subscribe_topic_message(self, msg): # 콜백함수
self.get_logger().info('Received message: {0}'.format(msg.data)) #수신한 메세지(msg)의 데이터를 그대로 출력
def main(args=None):
rclpy.init(args=args)
node = HelloworldSubscriber()
try:
rclpy.spin(node) #subscriber 또한 끝나지말고 계속 토픽을 수신해야함으로 spin에 넣어 무한루프
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
$ cd ~/my_ws/src/my_rclpy_package
$ code setup.py
helloworld_publisher = my_rclpy_package.helloworld_publisher:main: "helloworld_publisher" 노드 실행시, "my_rclpy_package"패키지의 "helloworld_publisher"파일의 "main"함수 실행from setuptools import find_packages, setup
package_name = 'my_rclpy_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='kym',
maintainer_email='kym@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'helloworld_publisher = my_rclpy_package.helloworld_publisher:main',
'helloworld_subscriber = my_rclpy_package.helloworld_subscriber:main',
],
},
)
새로운 코드 등이 추가되었다면 빌드를 해주어야 사용 가능
빌드는 반드시 작업공간의 루트에서
$ cd ~/my_ws
$ colcon build: 전체 패키지를 빌드$ colcon build --symlink-install: 전체 패키지를 빌드하지만, 설치파일을 install 폴더에 복사하는 것이 아닌 심볼릭 파일만 생성. 심볼릭 파일은 윈도우의 바로가기 파일과 비슷. 그냥 빌드하면 코드가 바뀌었을 때 설치파일의 내용과 달리지므로 다시 빌드해야하지만, 심볼릭 파일을 사용했을 경우 해당 코드와 연결만 해주기 때문에 다시 빌드하지 않아도 괜찮음$ colcon build --symlink-install --package-select <패키지이름>: 특정 패키지만 빌드. 하나의 패키지만 업데이트 했는데, 전체를 다시 빌드하면 오래걸림 $ colcon build --symlink-install --package-up-to <패키지이름>: 특정 패키지와 해당 패키지가 의존하는 패키지까지 빌드$ colcon build --symlink-install

$ sudo apt install python3-pip
$ pip3 install setuptools==58.2.0
$ ros2 run my_rclpy_package helloworld_publisher
이때, 아래와 같은 에러 발생함

처음 ros2를 설치했을 때와 마찬가지로 어딘가에 우리가 만든 패키지가 설치되었지만, 그 경로를 찾지 못하는 경우임
아래 명령어를 통해 환경 설정
$ . ~/my_ws/install/local_setup.bash
다시 publisher를 실행하면 아래와 같이 동작

새로운 탭을 열고 아래 명령어로 subscriber도 실행
$ ros2 run my_rclpy_package helloworld_subscriber

$rqt_graph 시각화 결과
ROS는 자주 사용하는 메세지 타입에 대해 미리 정의해 놓았기 때문에, 필요시 이를 가져와 사용할 수 있음
하지만 로봇을 이용한 서비스를 개발하는 과정에서 미리 정의된 메세지 타입이 아닌 새로운 타입이 필요한 경우가 있음
이러한 경우 custom interface를 생성해야하며, topic을 통해 송수신하는 정보는 메세지(msg)이므로, 새로운 msg 타입을 생성함
$ cd ~/my_ws/src
$ ros2 pkg create --build-type ament_cmake msg_interface_example
$ cd msg_interface_example
$ mkdir msg
$ cd msg
$ code TwoTextMessage.msg
<데이터타입> <데이터필드 이름> 형태로 각 데이터 필드 작성builtin_interfaces/Time stamp부분은 해당 메세지가 토픽을 통해 발행되었을 때, 발행 시간을 기록하기 위한 데이터 필드이며, 로봇 소프트웨어에서는 매우 많은 토픽 데이터가 쏟아지기 때문에 각 토픽마다 발행된 시간을 알고 있어야 데이터 동기화 등이 가능함string text_a, string text_b는 문자열데이터를 의미. 총 2개가 결합되어 있음# Messages
builtin_interfaces/Time stamp
string text_a
string text_b
package.xml파일을 다음과 같이 수정<buildtool_depend>, <exec_depend>, <member_of_group> 부분 작성에 유의$ cd ..
$ code package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>msg_interface_example</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="kym@todo.todo">kym</maintainer>
<license>TODO: License declaration</license>
<buildtool_depend>ament_cmake</buildtool_depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>builtin_interfaces</exec_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<export>
<build_type>ament_cmake</build_type>
</export>
</package>
find_package 파트에서 의존하고 있는 외부 패키지를 불러와 빌드할 때 사용set부분에 우리가 정의한 msg 파일의 목록을 작성(오타주의, 위에서 생성한 msg 파일 이름과 정확히 동일해야함).$ code CMakeLists.txt
cmake_minimum_required(VERSION 3.8)
project(msg_interface_example)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
################################################################################
# Find and load build settings from external packages
################################################################################
find_package(ament_cmake REQUIRED)
find_package(builtin_interfaces REQUIRED)
find_package(rosidl_default_generators REQUIRED)
################################################################################
# Declare ROS messages, services and actions
################################################################################
set(msg_files
"msg/TwoTextMessage.msg"
)
rosidl_generate_interfaces(${PROJECT_NAME}
${msg_files}
DEPENDENCIES builtin_interfaces
)
################################################################################
# Macro for ament package
################################################################################
ament_export_dependencies(rosidl_default_runtime)
ament_package()
set(msg_files
"msg/TwoTextMessage.msg"
"msg/TwoTextMessage2.msg"
)
$ cd ~/my_ws
$ colcon build --symlink-install
이제 우리가 정의한 custom interface를 노드에서 사용하여 토픽을 publish, subscribe하는 방법을 알아본다.
새로운 패키지 생성
$ cd ~/my_ws/src
$ ros2 pkg create my_custom_msg_topic_package --build-type ament_python --dependencies rclpy std_msgs msg_interface_example
$ cd my_custom_msg_topic_package/my_custom_msg_topic_package
$ code custom_topic_publisher.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from msg_interface_example.msg import TwoTextMessage # std_msgs의 String 대신 우리가 정의한 custom interface를 import
class HelloworldPublisher(Node):
def __init__(self):
super().__init__('custom_msg_publisher')
qos_profile = QoSProfile(depth=10)
self.helloworld_publisher = self.create_publisher(TwoTextMessage, 'my_custom_msg', qos_profile) # 토픽에 넣을 데이터 타입으로 우리가 정의한 TwoTextMessage 타입을 입력
self.timer = self.create_timer(1, self.publish_helloworld_msg)
self.count = 0
def publish_helloworld_msg(self):
msg = TwoTextMessage() # 우리가 정의한 TwoTextMessage 타입을 선언
msg.stamp = self.get_clock().now().to_msg() # stamp 데이터 필드에 시간정보 입력
msg.text_a = 'text_a: {0}'.format(self.count) # text_a 데이터 필드에 string 데이터 입력. 앞에서는 msg.data에 string데이터를 입력했지만, 우리가 정의한 TwoTextMessage 타입에는 data라는 이름의 데이터 필드가 없음에 유의
msg.text_b = 'text_b: {0}'.format(self.count*2)
self.helloworld_publisher.publish(msg)
self.get_logger().info('Published message: {0}'.format(msg.text_a))
self.get_logger().info('Published message: {0}'.format(msg.text_b))
self.count += 1
def main(args=None):
rclpy.init(args=args)
node = HelloworldPublisher()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
$ code custom_topic_subscriber.py
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from msg_interface_example.msg import TwoTextMessage # std_msgs의 String 대신 우리가 정의한 custom interface를 import
class HelloworldSubscriber(Node):
def __init__(self):
super().__init__('custom_msg_subscriber')
qos_profile = QoSProfile(depth=10)
self.helloworld_subscriber = self.create_subscription(
TwoTextMessage,
'my_custom_msg',
self.subscribe_topic_message,
qos_profile) # TwoTextMessage타입의 토픽을 수신
def subscribe_topic_message(self, msg): # TwoTextMessage 타입의 msg 데이터 내의 text_a, text_b 필드를 출력
self.get_logger().info('Received message: {0}'.format(msg.text_a))
self.get_logger().info('Received message: {0}'.format(msg.text_b))
def main(args=None):
rclpy.init(args=args)
node = HelloworldSubscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
msg_interface_example 패키지(앞에서 생성한 우리의 custom interface 패키지)가 들어있는 것을 확인할 수 있음$ cd ..
$ code package.xml
<?xml version="1.0"?>
<?xml-model href="http://download.ros.org/schema/package_format3.xsd" schematypens="http://www.w3.org/2001/XMLSchema"?>
<package format="3">
<name>my_custom_msg_topic_package</name>
<version>0.0.0</version>
<description>TODO: Package description</description>
<maintainer email="kym@todo.todo">kym</maintainer>
<license>TODO: License declaration</license>
<depend>rclpy</depend>
<depend>std_msgs</depend>
<depend>msg_interface_example</depend>
<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
<test_depend>ament_pep257</test_depend>
<test_depend>python3-pytest</test_depend>
<export>
<build_type>ament_python</build_type>
</export>
</package>
$ code setup.py
from setuptools import find_packages, setup
package_name = 'my_custom_msg_topic_package'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='kym',
maintainer_email='kym@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'custom_topic_publisher = my_custom_msg_topic_package.custom_topic_publisher:main',
'custom_topic_subscriber = my_custom_msg_topic_package.custom_topic_subscriber:main',
],
},
)
$ cd ~/my_ws
$ colcon build --symlink-install
$. install/local_setup.bash
$ ros2 run my_custom_msg_topic_package custom_topic_publisher
$ ros2 run my_custom_msg_topic_package custom_topic_subscriber
$ rqt_graph
