The Robot Operating System (ROS) is a set of software libraries and tools that help you build robot applications. From drivers to state-of-the-art algorithms, and with powerful developer tools, ROS has what you need for your next robotics project. And it's all open source.
로봇 운영체제(ROS, Robot Operating System)는 로봇 응용 프로그램 개발을 위한 오픈소스 소프트웨어 프레임워크로, 다음과 같은 기능을 제공한다: [1]
이 글에서는 ROS1을 어느 정도 사용해 본 개발자를 대상으로 ROS2의 개념과 구조를 설명할 예정이다. 만약 ROS 자체가 처음이라면, 아래 글들을 먼저 참고하는 것을 권장한다:
ROS1은 연구 및 교육 목적으로 훌륭한 기능을 제공하였으나, 다음과 같은 한계점을 갖고 있었다:
이러한 문제를 해결하기 위해 ROS2가 개발되었으며, ROS1과는 여러 핵심적인 차이점이 존재한다. 그중 가장 근본적인 변화는 통신 구조의 변경이다.
ROS1은 마스터 노드(roscore)를 중심으로 각 노드들이 등록하여 상호 통신하는 구조를 사용하였다. 이 구조는 다음과 같은 단점을 초래했다:
$ roscore
명령어를 통해 마스터 노드를 실행해야 함이러한 중앙 집중식 구조는 확장성과 내결함성 측면에서 제약이 컸다.
ROS2는 이러한 한계를 극복하기 위해 DDS(Data Distribution Service) 기반의 통신 미들웨어를 채택하였다. DDS는 다음과 같은 특징을 가진다:
결과적으로, ROS2는 별도의 마스터 노드 없이도 분산 환경에서 유연하고 안정적으로 동작할 수 있는 구조를 갖추게 되었으며, 다양한 산업 및 실시간 애플리케이션에 적합한 프레임워크로 발전하였다.
서브넷의 기술적인 내용을 서술하는 것이 아니라, ROS2 DDS를 사용하는데 있어서 불편함이 없도록 아주 생략하여 설명한다.
네트워크 안의 네트워크로, 간단히 말하여 IP주소의 일부를 공유하는 부분을 의미한다. 예를 들어, 192.168.0.xx/24 이런 IP주소 형식을 가지고 있는 같은 네트워크 안의 컴퓨터 집합이면, 같은 서브넷에 속한다고 볼 수 있다.
만약, 분산 시스템을 구축하고자 한다면, 서로 다른 호스트에 할당된 IP가 같은 IP주소 형식을 가지고 있는지 확인하자.
ROS1에서는 패키지를 빌드하기 위해 catkin 빌드 시스템을 사용하였으나, ROS2에서는 colcon 빌드 시스템을 채택하였다. 이 변경은 단순한 빌드 도구의 교체가 아니라, 빌드 구조와 방식 전반의 개선을 의미한다.
이전과의 주요 차이점은 다음과 같다:
ros1_ws/
├── src/
├── build/
├── devel/ ← 'setup.bash' 위치
ros2_ws/
├── src/
├── build/
├── install/ ← 'setup.bash' 위치
├── log/
이러한 구조적 변화는 빌드의 안정성, 유연성, 유지보수성 측면에서 긍정적인 효과를 제공한다.
다만, 이 외에도 코드 구현 방식, 런타임 인터페이스, 노드 실행 구조 등에서의 변화가 존재하며, 이에 대해서는 후속 항목에서 상세히 설명할 예정이다.
ROS2의 코드 구현 방식, 런타임 인터페이스, 노드 실행 구조 등을 이해하기 위한 개발 환경은 다음과 같다:
이 글에서는 Ubuntu나 ROS2 설치 과정에 대해서는 다루지 않는다. 우분투 공식 릴리즈 및 ROS2의 공식 도큐먼트를 참조하여 직접 설치해 주시기 바란다. 공식 문서를 따라가면 특별한 문제 없이 설치가 가능할 것이다.
위의 개발 환경이 정상적으로 구축되었다면, 본격적으로 ROS2 개발 환경을 설정할 수 있다.
ROS2는 통신 미들웨어로 DDS(Data Distribution Service)를 사용하여, 같은 서브넷에 있는 호스트들이 자동으로 서로를 감지하고 통신할 수 있다. 이는 ROS2의 큰 장점이지만, 의도하지 않게 다른 네트워크가 혼용되는 문제를 발생시킬 수 있다. 이러한 문제를 방지하기 위해, ROS_DOMAIN_ID라는 환경 변수를 설정하여 같은 도메인 아이디를 가진 호스트끼리만 통신할 수 있도록 한다.
ROS_DOMAIN_ID를 설정하면, 같은 서브넷과 같은 도메인 아이디를 가진 호스트들만 서로를 감지하고 통신하게 된다.
echo 'export ROS_DOMAIN_ID=<YOUR-DOMAIN-ID>' >> ~/.bashrc # ROS_DOMAIN_ID has to be uint type
source ~/.bashrc
ROS_DOMAIN_ID는 0 이상의 정수로 설정하며, 별도로 설정하지 않으면 기본값인 0이 적용된다.
ROS2 작업 공간을 생성하려면 아래와 같이 디렉토리를 만들고, src 폴더를 생성한다.
mkdir ros2_test_ws
cd ros2_test_ws
mkdir src
디렉토리 구조는 다음과 같이 설정된다:
$ tree
.
└── src
이제, ROS2 작업 공간을 최초로 빌드한다.
colcon build
이 명령을 실행하면, 자동으로 build
, install
, log
폴더가 생성되며 빌드가 진행된다.
주의사항: 빌드를 관리자 권한으로 실행하지 않도록 한다. 관리자 권한으로 빌드하면, 빌드된 파일에 접근할 권리가 일반 사용자에게 부여되지 않아 문제가 발생할 수 있다. 따라서 반드시 일반 사용자 권한으로 빌드해야 한다.
$ colcon build
Summary: 0 packages finished [0.17s]
빌드가 완료되면, 디렉토리 구조는 다음과 같다:
$ tree
.
├── build
│ └── COLCON_IGNORE
├── install
│ ├── COLCON_IGNORE
│ ├── local_setup.bash
│ ├── local_setup.ps1
│ ├── local_setup.sh
│ ├── _local_setup_util_ps1.py
│ ├── _local_setup_util_sh.py
│ ├── local_setup.zsh
│ ├── setup.bash
│ ├── setup.ps1
│ ├── setup.sh
│ └── setup.zsh
├── log
│ ├── build_2025-06-19_14-09-59
│ │ ├── events.log
│ │ └── logger_all.log
│ ├── COLCON_IGNORE
│ ├── latest -> latest_build
│ └── latest_build -> build_2025-06-19_14-09-59
└── src
7 directories, 15 files
빌드가 완료되면, install/setup.bash
파일을 환경 변수에 등록하여, ROS2 작업 환경을 설정한다.
echo 'export ROS_WS=<YOUR-ROS-WORKSPACE>' >> ~/.bashrc
echo 'source $ROS_WS/install/setup.bash' >> ~/.bashrc
source ~/.bashrc
ROS2에서 패키지는 ament_python
또는 ament_cmake
빌드 타입을 선택하여 생성할 수 있다. ament_python은 Python으로 작성된 패키지, ament_cmake는 C++로 작성된 패키지를 의미한다. 이 실습에서는 Python 기반 패키지를 예시로 설명할 것이다.
패키지를 생성하려면 다음과 같은 명령어를 사용한다:
cd src
ros2 pkg create --build-type ament_python <YOUR-PACKAGE-NAME>
cd ..
colcon build
위 명령어를 실행하면, 아래와 같이 패키지가 성공적으로 생성된다.
going to create a new package
package name: test_package
destination directory: /home/min/7cmdehdrb/ros2_test_ws
package format: 3
version: 0.0.0
description: TODO: Package description
maintainer: ['min <7cmdehdrb@naver.com>']
licenses: ['TODO: License declaration']
build type: ament_python
dependencies: []
creating folder ./test_package
creating ./test_package/package.xml
creating source folder
creating folder ./test_package/test_package
creating ./test_package/setup.py
creating ./test_package/setup.cfg
creating folder ./test_package/resource
creating ./test_package/resource/test_package
creating ./test_package/test_package/__init__.py
creating folder ./test_package/test
creating ./test_package/test/test_copyright.py
creating ./test_package/test/test_flake8.py
creating ./test_package/test/test_pep257.py
[WARNING]: Unknown license 'TODO: License declaration'. This has been set in the package.xml, but no LICENSE file has been created.
It is recommended to use one of the ament license identitifers:
Apache-2.0
BSL-1.0
BSD-2.0
BSD-2-Clause
BSD-3-Clause
GPL-3.0-only
LGPL-3.0-only
MIT
MIT-0
이렇게 생성된 패키지는 지정한 이름을 가지며, package.xml, setup.py, setup.cfg 등 ROS2 패키지에 필요한 기본 파일들을 포함하고 있다.
패키지의 디렉토리 구조는 다음과 같다:
.
└── <YOUR-PACKAGE-NAME>
├── package.xml
├── resource
│ └── <YOUR-PACKAGE-NAME>
├── setup.cfg
├── setup.py
├── test
│ ├── test_copyright.py
│ ├── test_flake8.py
│ └── test_pep257.py
└── <YOUR-PACKAGE-NAME>
└── __init__.py
이 때, 앞으로 작업하는 모든 Python 코드는 /<YOUR-PACKAGE-NAME>/<YOUR-PACKAGE-NAME>
폴더에서 작성하여야 한다.
ROS1과 달리, ROS2에서는 ament-python을 통해 Python 패키지를 빌드, 테스트 및 설치한다. 이는 setuptools
기반의 Python 패키지 빌드 방식이기 때문에, 실행 구조에 있어 ROS1과 명확한 차이가 있다.
#!/usr/bin/python3
)setup.py
를 통해 install
폴더에 빌드이 구조적 차이로 인해, 사용자에게 가장 큰 차이점은 Python 파일을 실행하려면 매번 패키지를 빌드해야 한다는 점과 Visual Studio Code(VSCode)가 빌드된 Python 파일을 참조하도록 설정해야 한다는 점이다.
따라서, VSCode에서 빌드된 파일을 참조할 수 있도록 settings.json
파일을 생성하고 수정해야 한다.
mkdir .vscode # 오타 아님
touch .vscode/settings.json
이후 생성된 settings.json
파일을 아래와 같이 수정하거나, 이미 파일에 내용이 있다면 아래 내용을 추가한다.
{
"python.analysis.extraPaths": [
"./install/<YOUR-PACKAGE-NAME>/lib/python3.10/site-packages", # ament-python
"./install/<YOUR-PACKAGE-NAME>/local/lib/python3.10/dist-packages", # ament-cmake
],
"python.autoComplete.extraPaths": [
"./install/<YOUR-PACKAGE-NAME>/lib/python3.10/site-packages", # ament-python
"./install/<YOUR-PACKAGE-NAME>/local/lib/python3.10/dist-packages", # ament-cmake
],
}
Python 기반 패키지라면, python.analysis.extraPaths
와 python.autoComplete.extraPaths
에 ./install/<YOUR-PACKAGE-NAME>/lib/python3.10/site-packages
를 추가하고, C++ 기반 패키지라면, ./install/<YOUR-PACKAGE-NAME>/local/lib/python3.10/dist-packages
를 추가해야 한다.
이 설정은 VSCode의 설정이며, ROS2와는 직접적인 연관이 없다. 그러나, 이 설정을 하지 않으면, VSCode는 작성한 Python 경로를 참조하지 못하게 되어 자동완성 기능 등에 제한이 발생할 수 있다. 또한, src 폴더 내의 파일을 참조하게 되어 실제 ROS2가 실행하는 파일과 VSCode가 참조하는 파일이 달라지는 문제가 발생할 수 있다. 따라서, 원활한 개발을 위해 이 설정은 적극 권장된다.
ROS2의 노드는 기능적으로 ROS1의 노드와 차이가 없지만, 코드 구현 방식에 있어서 차이가 존재한다.
ROS2에서 노드는 Publisher와 Subscriber를 등록할 수 있다. Publisher는 DDS 네트워크에서 자신이 메시지를 발행하고 있다는 것을 공시(Advertise)하며, Subscriber는 특정 토픽에 대해 발행되는 메시지를 수신한다.
이 과정에서 필요한 주요 요소는 다음과 같다:
아래는 ROS2에서 Python으로 작성된 가장 간단한 Publisher 노드의 예시 코드이다:
# ROS2
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.duration import Duration
from rclpy.qos import QoSProfile, qos_profile_system_default
# Message
from std_msgs.msg import *
from geometry_msgs.msg import *
from sensor_msgs.msg import *
from nav_msgs.msg import *
from visualization_msgs.msg import *
# TF
from tf2_ros import *
# Python
import numpy as np
class SimplePublisherNode(Node):
def __init__(self):
super().__init__("simple_publisher_node")
# Create a publisher. Message type is String, topic name is "/chatter" and QoS is qos_profile_system_default
self.publisher = self.create_publisher(
String, "/chatter", qos_profile_system_default
)
# Timer to publish messages periodically
self.timer = self.create_timer(0.1, self.publish_message)
# Publish simple string message
def publish_message(self):
msg = String()
msg.data = "Hello, ROS2!"
self.publisher.publish(msg)
self.get_logger().info(f"Published message: {msg.data}")
def main():
rclpy.init(args=None)
node = SimplePublisherNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
ROS2에서 노드 생성 방식은 ROS1과 다르다.
ROS1에서는 노드가 코드 그 자체였으며, rospy.init_node()
를 통해 해당 코드가 노드의 기능을 수행함을 선언하여 노드를 생성했다.
ROS2에서는 노드를 캡슐화하여, 반드시 노드 인스턴스를 생성해야 한다는 규칙이 추가되었다.
위의 코드에서 가장 특징적인 부분은 노드 인스턴스를 생성하는 부분이다. 예를 들어, SimplePublisherNode
는 Node
클래스를 상속받은 사용자 정의 클래스이고, 해당 클래스의 인스턴스를 node = SimplePublisherNode()
로 생성한다.
이와 같은 구조 변화로 인해, 기존 rospy
에서 제공하던 함수들은 대부분 Node
클래스의 메소드로 대체되었다.
변경된 Node 구조 덕분에 한 코드 내에서 두 개 이상의 노드를 생성하는 것도 가능하다. 그러나 실제로 두 개 이상의 노드를 선언하면 유지보수에 많은 노력이 필요하므로, 이는 피하는 것이 권장된다.
아래는 ROS2에서 Python으로 작성된 가장 간단한 Subscriber 노드의 예시 코드이다. ROS2의 규칙에 따르는 부분만 제외한다면, ROS1의 구조와 큰 차이는 없다.
# ROS2
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.duration import Duration
from rclpy.qos import QoSProfile, qos_profile_system_default
# Message
from std_msgs.msg import *
from geometry_msgs.msg import *
from sensor_msgs.msg import *
from nav_msgs.msg import *
from visualization_msgs.msg import *
# TF
from tf2_ros import *
# Python
import numpy as np
class SimpleSubscriberNode(Node):
def __init__(self):
super().__init__("simple_subscriber_node")
# Create a subscriber for a topic, e.g., 'chatter'
self.subscriber = self.create_subscription(
String, "/chatter", self.listener_callback, qos_profile_system_default
)
def listener_callback(self, msg: String):
self.get_logger().info(f"Received message: {msg.data}")
def main():
rclpy.init(args=None)
node = SimpleSubscriberNode()
rclpy.spin(node)
node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
ROS1에서 ROS2로 넘어오면서, rospy
에서 제공되던 많은 함수들이 Node
클래스의 메소드와 rclpy
로 통합되었다. 이 문단에서는 ROS1에서 ROS2로 넘어오는 사람들이 자주 사용하는 함수들을 정리하여, 혼동을 최소화하고 쉽게 적응할 수 있도록 돕는다.
단, 이 문단에서는 각 함수들의 기능만을 간단히 소개하며, 각 함수가 요구하는 Arguments에 대해서는 다루지 않는다. 이 부분은 ROS2의 공식 도큐멘트나 VSCode의 자동 완성 기능을 참고하는 것이 좋다.
주의!:
Node
의 함수로 소개한 것들은 모두 static method가 아니므로, 실제 사용 시Node
인스턴스를 통해 호출해야 한다.
Node.get_logger().info()
Node.get_logger().warn()
Node.get_logger().error()
Node.get_logger().debug()
Node.get_logger().fatal()
이 함수들은 디버깅 레벨에 맞춰 로그를 출력하는 함수들이다. 각 함수는 해당 레벨에 맞는 로그 메시지를 기록한다.
Node.get_clock().now()
이 함수는 Time
객체를 반환하는 함수로, ROS2에서 시간 관련 작업을 할 때 사용된다. 반환된 Time
객체는 Header
와 같은 stamp
가 필요한 메시지 필드에 적용할 때 .to_msg()
함수를 사용해 메시지 형식으로 변환해야 한다.
주의!:
Node.get_clock().now()
는 노드의 시간을 의미하는 객체를 반환한다.rclpy.time.Time()
를 통하여 선언한 객체는 초기화 된 시간을 의미하기 때문에, 서로 다른 값을 가질 수 있으므로, 이를 혼용하지 않도록 주의해야 한다.
Node.declare_parameter()
Node.get_parameter()
이 함수들은 ROS Parameter Server에 등록된 파라미터를 선언하고 읽어오는 함수들이다. declare_parameter()
함수는 반드시 get_parameter()
함수보다 먼저 호출되어야 한다. 이는 get_parameter()
함수가 해당 노드에서 이미 선언된 파라미터만 읽어올 수 있기 때문이다.
Node.create_publisher()
Node.create_subscription()
이 함수들은 각각 Publisher
와 Subscriber
객체를 생성한다. create_publisher()
는 데이터를 발행하는 Publisher
를, create_subscription()
은 특정 토픽을 구독하는 Subscriber
를 생성하는 함수이다.
Node.create_timer()
이 함수는 Timer
객체를 생성하며, 지정된 콜백 함수를 주어진 주기마다 자동으로 호출한다. ROS1에서 while 문을 사용하여 구현하던 기능들을 create_timer()
함수로 일부 대체할 수 있지만, spin()
과의 결합을 고려해야 하기 때문에 완전히 대체할 수는 없다.
Publisher와 Subscriber를 통한 통신은 같은 DDS 네트워크에 포함된 노드끼리 비동기식으로 처리된다. 즉, 발신자는 상대방의 응답을 기다리지 않고 계속해서 메시지를 송수신한다. 따라서 센서 데이터와 같은 데이터는 한 번 정도 누락되더라도 큰 문제가 되지 않는 경우에 많이 사용된다. 이러한 상황에서는 Publisher-Subscriber 구조가 매우 적합하다.
하지만, 만약 메시지를 단 한 번만 발송하고, 그 메시지가 반드시 수신되어야 하는 상황이라면 얘기가 달라진다. 이 경우, 메시지를 보내는 측은 반드시 메시지를 받는 측이 정확하게 메시지를 수신했는지를 확인할 필요가 있다. 이러한 요구 사항에 적합한 ROS2의 통신 구조는 바로 Service 방식 통신이다.
Service 통신 구조는 노드에 서비스 서버를 생성하고, 다른 노드에서 서비스 클라이언트를 생성하는 방식으로 이루어진다. 클라이언트는 서비스 메시지 타입에 맞는 요청을 서버로 보내고, 서버는 이를 받아 클라이언트에 적절한 응답을 반환한다.
이 방식은 동기식 통신으로, 메시지가 확실하게 전송되고 응답을 받을 수 있어, 메시지의 수신을 반드시 확인해야 하는 상황에 매우 적합하다. 그러나 응답을 기다리는 특성상, 연속적인 메시지 송수신에는 적합하지 않다는 한계가 있다.
Service 기반 ROS2 통신을 위해서는 Request-Response 메시지를 정의해야 한다. 아래는 custom_msgs
라는 패키지를 만들고, 그 패키지에 Request-Response 메시지인 srv 파일을 선언하는 과정을 설명한다:
cd src
ros2 pkg create --build-type ament_cmake custom_msgs
srv
파일 생성mkdir custom_msgs/srv
touch custom_msgs/srv/SimpleService.srv
SimpleService.srv
내용 작성
string request
---
string response
package.xml
, CMakeLists.txt
수정package.xml
에 다음 내용을 추가한다. 다른 패키지 의존성이 있는 경우 (예: 다른 메시지 타입이 srv
에 포함된 경우) 해당 의존성도 추가해야 한다. <build_depend>rosidl_default_generators</build_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
<depend>std_msgs</depend>
CMakeLists.txt
에 다음 내용을 추가한다. find_package()
를 통해 추가한 패키지 의존성은 package.xml
과 일치해야 한다.find_package(rosidl_default_generators REQUIRED)
find_package(std_msgs REQUIRED) # YOUR DEPENDENT MESSAGES
rosidl_generate_interfaces(${PROJECT_NAME}
"srv/SimpleService.srv"
DEPENDENCIES std_msgs # YOUR DEPENDENT MESSAGES
)
colcon build
주의! :
UserWarning: Unknown distribution option:'tests_require'
오류가 발생하면,setup.py
의tests_require=["pytest"]
부분을 주석 처리하면 해결된다.
from custom_msgs.srv import SimpleService
위 코드가 정상적으로 동작하는지 확인한다. 빌드가 성공했으나 해당 패키지를 찾을 수 없다는 에러가 발생할 경우, 환경변수에 빌드 결과가 등록되었는지 확인해본다. 이를 위해 다음 명령어를 실행한다:
source ~/.bashrc
또는
source install/setup.bash
위 명령어를 통해 환경변수에 빌드 결과를 등록했는지 확인한다. 만약 빌드가 성공하고 코드도 동작하지만 VSCode에서 인식되지 않는다면, VSCode 설정 과정도 확인해야 한다.
# ROS2
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.duration import Duration
from rclpy.qos import QoSProfile, qos_profile_system_default
# Message
from std_msgs.msg import *
from geometry_msgs.msg import *
from sensor_msgs.msg import *
from nav_msgs.msg import *
from visualization_msgs.msg import *
from custom_msgs.srv import SimpleService
# TF
from tf2_ros import *
# Python
import numpy as np
class SimpleServerNode(Node):
def __init__(self):
super().__init__("simple_server_node")
# Create a service server for a service, e.g., 'add_two_ints'
self.service = self.create_service(
SimpleService,
"/simple_service",
self.handle_service,
qos_profile=qos_profile_system_default,
)
def handle_service(
self, request: SimpleService.Request, response: SimpleService.Response
):
self.get_logger().info(f"Received request: {request.request}")
response.response = "WORLD!"
return response
def main():
rclpy.init(args=None)
node = SimpleServerNode()
# Spin in a separate thread
thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True)
thread.start()
hz = 10.0
rate = node.create_rate(hz)
try:
while rclpy.ok():
rate.sleep()
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
thread.join()
if __name__ == "__main__":
main()
이 서버에서 다루는 메시지는 다음과 같은 구조를 가진다:
string request
---
string response
요청으로 String 메시지 하나를 보내고, 응답으로 String 메시지 하나를 보내는 간단한 구조로, 위에서 작성한 코드는 응답으로 "WORLD!"를 반환한다.
구체적으로 코드를 살펴보면 다음과 같다:
from custom_msgs.srv import SimpleService
이 서비스 메시지 객체는 SimpleService.Request
타입의 request
필드와, SimpleService.Response
타입의 response
필드를 가진다. SimpleService.Request
와 SimpleService.Response
는 srv
에서 선언한 것과 같은 필드를 가진다.
self.service = self.create_service(
SimpleService,
"/simple_service",
self.handle_service,
qos_profile=qos_profile_system_default,
)
SimpleService
타입을 사용하여 /simple_service
라는 이름의 서비스를 생성한다. 이 서비스는 요청을 받으면, self.handle_service
라는 콜백 함수를 사용하여 응답을 보낸다.
def handle_service(
self, request: SimpleService.Request, response: SimpleService.Response
):
self.get_logger().info(f"Received request: {request.request}")
response.response = "WORLD!"
return response
서비스의 콜백 함수는 반드시 request
와 response
두 개의 매개변수를 받는다. request
는 들어온 요청 메시지이고, response
는 응답 메시지이다. 따라서 response
의 필드를 적절히 채워서 서버는 클라이언트에게 응답을 보내게 된다.
# ROS2
import rclpy
from rclpy.node import Node
from rclpy.time import Time
from rclpy.duration import Duration
from rclpy.qos import QoSProfile, qos_profile_system_default
from rclpy.task import Future
# Message
from std_msgs.msg import *
from geometry_msgs.msg import *
from sensor_msgs.msg import *
from nav_msgs.msg import *
from visualization_msgs.msg import *
from custom_msgs.srv import SimpleService
# TF
from tf2_ros import *
# Python
import numpy as np
class SimpleClientNode(Node):
def __init__(self):
super().__init__("simple_client_node")
# Create a service client for a service, e.g., 'add_two_ints'
self.client = self.create_client(
SimpleService, "/simple_service", qos_profile=qos_profile_system_default
)
self.async_response = False
# Wait for the service to be available
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Service not available, waiting...")
def send_request(self):
self.get_logger().info("Sending request to the service...")
# Create a request and send it
request = SimpleService.Request()
request.request = "Hello"
if self.async_response:
# Asynchronous call
future: Future = self.client.call_async(request)
future.add_done_callback(self.response_callback)
else:
result: SimpleService.Response = self.client.call(request)
self.get_logger().info(f"Received response: {result.response}")
def response_callback(self, future: Future):
try:
response: SimpleService.Response = future.result()
self.get_logger().info(f"Received response: {response.response}")
except Exception as e:
self.get_logger().error(f"Service call failed: {e}")
def main():
rclpy.init(args=None)
node = SimpleClientNode()
# Spin in a separate thread
thread = threading.Thread(target=rclpy.spin, args=(node,), daemon=True)
thread.start()
hz = 10.0
rate = node.create_rate(hz)
node.send_request()
try:
while rclpy.ok():
rate.sleep()
except KeyboardInterrupt:
pass
node.destroy_node()
rclpy.shutdown()
thread.join()
if __name__ == "__main__":
main()
self.client = self.create_client(
SimpleService, "/simple_service", qos_profile=qos_profile_system_default
)
self.async_response = False
# Wait for the service to be available
while not self.client.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Service not available, waiting...")
SimpleService
타입을 사용하여 /simple_service
라는 이름의 클라이언트를 생성한다. 이 클라이언트는 서비스 서버와의 연결이 확립될 때까지 1초마다 대기하면서 계속 확인한다. 이는 서비스 서버의 요청을 날리는 것이 아닌, 단순히 서비스 서버가 응답이 가능한 상태인지를 확인하는 것이다.
def send_request(self):
self.get_logger().info("Sending request to the service...")
# Create a request and send it
request = SimpleService.Request()
request.request = "Hello"
if self.async_response:
# Asynchronous call
future: Future = self.client.call_async(request)
future.add_done_callback(self.response_callback)
else:
result: SimpleService.Response = self.client.call(request)
self.get_logger().info(f"Received response: {result.response}")
클라이언트는 동기식과 비동기식 요청을 모두 지원한다. 동기식은 call()
을 사용하여 요청 후 응답을 기다리며, 비동기식은 call_async()
를 사용하여 요청하고, 응답을 기다리지 않고 다른 작업을 계속 수행할 수 있다. 이때 비동기식 응답은 rclpy.task.Future
객체를 통해 처리된다.
call
): SimpleService.Response
객체를 반환.call_async
): rclpy.task.Future
객체를 반환.따라서, 응답 결과에 접근하기 위해서 동기식에서는 단순하게 응답을 리턴하고, 비동기식에서는 rclpy.task.Future
객체를 처리하는 추가적인 함수를 작성해야 한다.
call
은 엄밀히 말하여 클라이언트가 다루는 타입의Response
객체를 반환하는 것이다.
call_async
를 사용한 경우에만 해당한다.
future.add_done_callback(self.response_callback)
Future
객체에서 완료 콜백을 추가하여, 응답이 완료되었을 때 호출되는 함수(response_callback)를 지정한다.
def response_callback(self, future: Future):
try:
response: SimpleService.Response = future.result()
self.get_logger().info(f"Received response: {response.response}")
except Exception as e:
self.get_logger().error(f"Service call failed: {e}")
response_callback 함수에서는 future.result()
를 호출하여 응답 결과에 접근한다. 만약 응답 호출에 실패한 경우, 예외를 처리하여 에러 메시지를 로그로 출력한다.
TF는 ROS1과 ROS2의 사용에 있어서, 마찬가지로 기능적인 변화는 없고 코드 구현 방식에 있어서만 약간의 차이가 존재한다.
만약 TF에 대하여 이해가 없는 상태라면, 이 글을 참고하여 TF가 무엇인지 이해하고 오는 것을 권장한다.
ROS2의 TF를 처리하는 클래스는 다음과 같다:
Buffer
: TF 데이터의 조회 및 관리를 담당TransformListener
: 변환을 수신하는 역할TransformBroadcaster
: 변환을 송신하는 역할이 클래스들은 tf2_ros
에 정의되어 있으며, TF 메세지를 송수신할 경우 반드시 인스턴스를 생성해야 하는 필수적인 요소이다.
아래는 이런 인스턴스를 사용하여 TF를 발행 및 처리하는 클래스의 예시 코드이다.
class SimpleTransformer(object):
def __init__(self, node: Node):
self._node = node
self._tf_buffer = Buffer(node=self._node, cache_time=Duration(seconds=0, nanoseconds=100000000))
self._tf_listener = TransformListener(node=self._node, buffer=self._tf_buffer)
self._tf_broadcaster = TransformBroadcaster(self._node)
def publish_transform(
self,
translation: Vector3,
rotation: Quaternion,
target_frame: str,
source_frame: str,
):
msg = TransformStamped(
header=Header(
stamp=self._node.get_clock().now().to_msg(), frame_id=target_frame
),
child_frame_id=source_frame,
translation=translation,
rotation=rotation,
)
self._tf_broadcaster.sendTransform(msg)
def check_transform_valid(self, target_frame: str, source_frame: str):
try:
is_valid = self._tf_buffer.can_transform(
target_frame,
source_frame,
self._node.get_clock().now().to_msg(),
timeout=Duration(seconds=0.1),
)
if not is_valid:
raise Exception("Transform is not valid")
return is_valid
except Exception as e:
self._node.get_logger().warn(
f"Cannot Lookup Transform Between {target_frame} and {source_frame}"
)
return False
def transform_pose(
self,
pose: PoseStamped,
target_frame: str,
source_frame: str,
) -> PoseStamped:
"""
Transform a pose from the source frame to the target frame.
"""
if not isinstance(pose, PoseStamped):
self._node.get_logger().warn("Input must be of type PoseStamped.")
return None
if self.check_transform_valid(target_frame, source_frame):
try:
pose_stamped = TF2PoseStamped(
header=Header(
stamp=self._node.get_clock().now().to_msg(),
frame_id=source_frame,
),
pose=pose.pose,
)
transformed_data = self._tf_buffer.transform(
object_stamped=pose_stamped,
target_frame=target_frame,
timeout=Duration(seconds=0, nanoseconds=100000000),
)
result = PoseStamped(
header=transformed_data.header, pose=transformed_data.pose
)
return result
except Exception as e:
self._node.get_logger().warn(
f"Cannot Transform Pose from {source_frame} to {target_frame}"
)
return None
return None
TF 수신의 변화점은 다음과 같다:
TransformListener
객체가 Buffer
인스턴스를 요구한다.transformPose
등으로 구분하던 변환 함수가 transform
함수로 통일되며, 이 함수는 매개변수로 object_stamped
를 요구한다.
object_stamped
는 문자 그대로 stamp 필드가 있는 객체를 의미하지만, 일반적인 geometry_msgs에 존재하는 메세지가 아닌,tf2_geometry_msgs
따위에 선언된 메세지를 의미한다. 이들은 서로 이름이 같지만 엄연히 다른 클래스이기 때문에, 명확하게 구분하여야만 한다.
TF의 송신의 변화점은 다음과 같다:
sendTransform
함수가 여러 Arguments를 요구하던 것에서 TransformStamped
타입의 메세지를 요구하는 것으로 변경됨지금까지의 내용을 통하여 ROS2가 되면서 발생한 주요 기능적인 변화, 그리고 코드 구현 방식의 차이를 알아보았다.
ROS2는 기능적인 측면에서 ROS1과 큰 차이는 없으나, 통신 미들웨어의 변화로 인하여 통신 부분에서 큰 진보를 보여주었고, 또한 보안성 측면에서도 크게 향상되었다.
이 글을 읽는 모든 사람들이 ROS2를 이용하여 성공적인 연구를 수행할 수 있을 것을 기원한다.