args
매개변수는 명령줄 인자를 ROS 2 노드에 전달하는 데 사용되며, 대부분의 경우 sys.argv
를 통해 전달ROS 2 런타임과 통신을 시작하기 전에, 필수적인 설정과 리소스 할당을 수행
지정된 노드를 실행
하고, ROS 2 시스템으로부터 메시지를 수신 대기
spin
함수는 프로그램이 종료될 때까지 블로킹
새로운 데이터가 도착하거나 이벤트가 발생할 때마다 -> 해당 노드의 콜백 함수를 호출하여 처리
.destroy_node()
rclpy.shutdown()
rclpy
라이브러리의 사용을 종료하고 ROS 2 시스템과의 통신을 정리모든 노드의 실행이 완료된 후에 실행되어야 함
Node
인스턴스 자체는 독립된 프로세스로 실행되지 않습니다. 실제 프로세스 또는 스레드 경계와는 독립적
다른 스크립트를, 각각의 터미널이 따로 실시하면, 그들은 다른 프로세스에서 구동됨.
std_msgs.msg.String
은 표준 문자열 메시지 타입qos_profile
은 rclpy.qos.QoSProfile
객체를 사용하여 정의됨import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
from multiprocessing import Process
import sys
class ImageProcessor(Node):
def __init__(self):
super().__init__('image_processor')
self.subscription = self.create_subscription(
Image,
'raw_image',
self.listener_callback,
10)
self.publisher = self.create_publisher(
Image,
'processed_image',
10)
self.bridge = CvBridge()
self.timer = self.create_timer(0.5, self.timer_callback) # 0.5초마다 호출
self.last_image = None
def listener_callback(self, msg):
try:
cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
self.last_image = cv_image
except Exception as e:
self.get_logger().error('Failed to convert image: %r' % (e,))
def timer_callback(self):
if self.last_image is not None:
processed_image = cv2.cvtColor(self.last_image, cv2.COLOR_BGR2GRAY)
processed_image = cv2.cvtColor(processed_image, cv2.COLOR_GRAY2BGR)
processed_image[:] = 0
try:
ros_image = self.bridge.cv2_to_imgmsg(processed_image, "bgr8")
self.publisher.publish(ros_image)
self.get_logger().info('Processed image published')
except Exception as e:
self.get_logger().error('Failed to convert image: %r' % (e,))
def run_node():
rclpy.init()
image_processor = ImageProcessor()
rclpy.spin(image_processor)
image_processor.destroy_node()
rclpy.shutdown()
def main():
if len(sys.argv) > 1 and sys.argv[1] == 'process':
# 별도 프로세스에서 노드 실행
process = Process(target=run_node)
process.start()
process.join()
else:
# 현재 프로세스에서 노드 실행
run_node()
if __name__ == '__main__':
main()
std_msgs.msg.String
은 표준 문자열 메시지 타입topic: 구독할 토픽의 이름
callback: 토픽에 대한 메시지를 수신할 때마다 호출될 콜백 함수
rclpy.qos.QoSProfile
객체를 사용하여 정의import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MySubscriber(Node):
def __init__(self):
super().__init__('my_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
my_subscriber = MySubscriber()
rclpy.spin(my_subscriber)
my_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
create_publisher
함수의 인자 중 queue_size
는 ROS 1에서 사용되었으나, ROS 2에서는 qos_profile
로 대체되었습니다. qos_profile
인자를 사용하여 메시지의 메시지의 신뢰성
, 내구성
, 전달 속도
등과 관련된 다양한 속성이 포함RELIABLE
및 TRANSIENT_LOCAL
설정이 중요BEST_EFFORT
및 VOLATILE
설정이 더 적합퍼블리셔의 메시지 큐의 크기를 정의
보유할 수 있는 메시지의 최대 개수를 결정
queue_size=10
은 최대 10개의 메시지를 큐에 보관할 수 있다는 것을 의미import rclpy
from rclpy.qos import QoSPresetProfiles
from std_msgs.msg import String
def main():
rclpy.init()
node = rclpy.create_node('qos_example_node')
# SYSTEM_DEFAULT QoS 프로파일 사용
qos_profile_system_default = QoSPresetProfiles.SYSTEM_DEFAULT
# SENSOR_DATA QoS 프로파일 사용
qos_profile_sensor_data = QoSPresetProfiles.SENSOR_DATA
# 퍼블리셔 생성
publisher_system_default = node.create_publisher(
String, 'system_default_topic', qos_profile_system_default)
publisher_sensor_data = node.create_publisher(String, 'sensor_data_topic',
qos_profile_sensor_data)
# 메시지 생성
msg = String()
msg.data = 'Hello, World!'
# 일부 코드에서는 여기서 메시지를 퍼블리시하여 사용할 수 있습니다.
# 노드 종료
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
System Default (시스템 기본값):
qos_profile=rclpy.qos.QoSPresetProfiles.SYSTEM_DEFAULT
Sensor Data (센서 데이터):
높은 데이터 속도가 필요한 센서 데이터 전송에 적합
낮은 신뢰성과 함께
데이터의 신속한 전달을 중시
qos_profile=rclpy.qos.QoSPresetProfiles.SENSOR_DATA
Parameters (파라미터):
높은 신뢰성
과, 보다 느린 전송 속도
qos_profile=rclpy.qos.QoSPresetProfiles.PARAMETERS
Services (서비스):
서비스 요청과 응답에 사용
qos_profile=rclpy.qos.QoSPresetProfiles.SERVICES_DEFAULT
Parameter Events (파라미터 이벤트):
파라미터 변경에 대한 알림에 사용
qos_profile=rclpy.qos.QoSPresetProfiles.PARAMETER_EVENTS
rclpy.qos.QoSProfile / QoSReliabilityPolicy
: QoS 설정 커스터마이징rclpy.qos.QoSProfile
을 사용하여 필요한 옵션을 조정할 수 있습니다. 예를 들어:from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
custom_qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
depth=10
)
RELIABLE
정책을 사용하고, 큐의 크기를 10으로 설정합니다.RELIABLE
은 모든 메시지가 도착할 것으로 예상되며, 네트워크 문제가 발생하면 재전송을 시도BEST_EFFORT
는 높은 성능을 위해 신뢰성을 희생할 수 있으며, 메시지 손실이 발생할 수 있음from rclpy.qos import QoSProfile
from rclpy.qos import ReliabilityPolicy
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE
)
TRANSIENT_LOCAL
은 메시지가 노드의 생명주기를 넘어서 지속되도록 합니다. 새로운 구독자가 연결될 때 과거의 메시지를 받을 수 있습니다.VOLATILE
은 메시지가 오래 보관되지 않고 빠르게 사라질 것임을 의미from rclpy.qos import DurabilityPolicy
qos_profile = QoSProfile(
durability=DurabilityPolicy.TRANSIENT_LOCAL
)
from rclpy.qos import DeadlinePolicy
qos_profile = QoSProfile(
deadline=DeadlinePolicy(rclpy.duration.Duration(seconds=2))
)
from rclpy.qos import LivelinessPolicy
qos_profile = QoSProfile(
liveliness=LivelinessPolicy.AUTOMATIC
)
from rclpy.qos import LifespanPolicy
qos_profile = QoSProfile(
lifespan=LifespanPolicy(rclpy.duration.Duration(seconds=5))
)
KEEP_LAST
는 최근 메시지만 저장하며, 큐 크기는 저장할 메시지의 수를 정의합니다.KEEP_ALL
은 모든 메시지를 저장하려고 시도합니다.from rclpy.qos import HistoryPolicy
qos_profile = QoSProfile(
history=HistoryPolicy.KEEP_LAST,
depth=10 # 큐 크기와 함께 사용됩니다.
)