args 매개변수는 명령줄 인자를 ROS 2 노드에 전달하는 데 사용되며, 대부분의 경우 sys.argv를 통해 전달ROS 2 런타임과 통신을 시작하기 전에, 필수적인 설정과 리소스 할당을 수행지정된 노드를 실행하고, ROS 2 시스템으로부터 메시지를 수신 대기spin 함수는 프로그램이 종료될 때까지 블로킹새로운 데이터가 도착하거나 이벤트가 발생할 때마다 -> 해당 노드의 콜백 함수를 호출하여 처리.destroy_node()rclpy.shutdown()rclpy 라이브러리의 사용을 종료하고 ROS 2 시스템과의 통신을 정리모든 노드의 실행이 완료된 후에 실행되어야 함Node 인스턴스 자체는 독립된 프로세스로 실행되지 않습니다. 실제 프로세스 또는 스레드 경계와는 독립적다른 스크립트를, 각각의 터미널이 따로 실시하면, 그들은 다른 프로세스에서 구동됨.std_msgs.msg.String은 표준 문자열 메시지 타입qos_profile은 rclpy.qos.QoSProfile 객체를 사용하여 정의됨import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
from cv_bridge import CvBridge
import cv2
from multiprocessing import Process
import sys
class ImageProcessor(Node):
def __init__(self):
super().__init__('image_processor')
self.subscription = self.create_subscription(
Image,
'raw_image',
self.listener_callback,
10)
self.publisher = self.create_publisher(
Image,
'processed_image',
10)
self.bridge = CvBridge()
self.timer = self.create_timer(0.5, self.timer_callback) # 0.5초마다 호출
self.last_image = None
def listener_callback(self, msg):
try:
cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
self.last_image = cv_image
except Exception as e:
self.get_logger().error('Failed to convert image: %r' % (e,))
def timer_callback(self):
if self.last_image is not None:
processed_image = cv2.cvtColor(self.last_image, cv2.COLOR_BGR2GRAY)
processed_image = cv2.cvtColor(processed_image, cv2.COLOR_GRAY2BGR)
processed_image[:] = 0
try:
ros_image = self.bridge.cv2_to_imgmsg(processed_image, "bgr8")
self.publisher.publish(ros_image)
self.get_logger().info('Processed image published')
except Exception as e:
self.get_logger().error('Failed to convert image: %r' % (e,))
def run_node():
rclpy.init()
image_processor = ImageProcessor()
rclpy.spin(image_processor)
image_processor.destroy_node()
rclpy.shutdown()
def main():
if len(sys.argv) > 1 and sys.argv[1] == 'process':
# 별도 프로세스에서 노드 실행
process = Process(target=run_node)
process.start()
process.join()
else:
# 현재 프로세스에서 노드 실행
run_node()
if __name__ == '__main__':
main()
std_msgs.msg.String은 표준 문자열 메시지 타입topic: 구독할 토픽의 이름
callback: 토픽에 대한 메시지를 수신할 때마다 호출될 콜백 함수
rclpy.qos.QoSProfile 객체를 사용하여 정의import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class MySubscriber(Node):
def __init__(self):
super().__init__('my_subscriber')
self.subscription = self.create_subscription(
String,
'topic',
self.listener_callback,
10)
self.subscription # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info('I heard: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
my_subscriber = MySubscriber()
rclpy.spin(my_subscriber)
my_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
create_publisher 함수의 인자 중 queue_size는 ROS 1에서 사용되었으나, ROS 2에서는 qos_profile로 대체되었습니다. qos_profile 인자를 사용하여 메시지의 메시지의 신뢰성, 내구성, 전달 속도 등과 관련된 다양한 속성이 포함RELIABLE 및 TRANSIENT_LOCAL 설정이 중요BEST_EFFORT 및 VOLATILE 설정이 더 적합퍼블리셔의 메시지 큐의 크기를 정의보유할 수 있는 메시지의 최대 개수를 결정queue_size=10은 최대 10개의 메시지를 큐에 보관할 수 있다는 것을 의미import rclpy
from rclpy.qos import QoSPresetProfiles
from std_msgs.msg import String
def main():
rclpy.init()
node = rclpy.create_node('qos_example_node')
# SYSTEM_DEFAULT QoS 프로파일 사용
qos_profile_system_default = QoSPresetProfiles.SYSTEM_DEFAULT
# SENSOR_DATA QoS 프로파일 사용
qos_profile_sensor_data = QoSPresetProfiles.SENSOR_DATA
# 퍼블리셔 생성
publisher_system_default = node.create_publisher(
String, 'system_default_topic', qos_profile_system_default)
publisher_sensor_data = node.create_publisher(String, 'sensor_data_topic',
qos_profile_sensor_data)
# 메시지 생성
msg = String()
msg.data = 'Hello, World!'
# 일부 코드에서는 여기서 메시지를 퍼블리시하여 사용할 수 있습니다.
# 노드 종료
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
System Default (시스템 기본값):
qos_profile=rclpy.qos.QoSPresetProfiles.SYSTEM_DEFAULTSensor Data (센서 데이터):
높은 데이터 속도가 필요한 센서 데이터 전송에 적합낮은 신뢰성과 함께 데이터의 신속한 전달을 중시qos_profile=rclpy.qos.QoSPresetProfiles.SENSOR_DATAParameters (파라미터):
높은 신뢰성과, 보다 느린 전송 속도qos_profile=rclpy.qos.QoSPresetProfiles.PARAMETERSServices (서비스):
서비스 요청과 응답에 사용qos_profile=rclpy.qos.QoSPresetProfiles.SERVICES_DEFAULTParameter Events (파라미터 이벤트):
파라미터 변경에 대한 알림에 사용qos_profile=rclpy.qos.QoSPresetProfiles.PARAMETER_EVENTSrclpy.qos.QoSProfile / QoSReliabilityPolicy: QoS 설정 커스터마이징rclpy.qos.QoSProfile을 사용하여 필요한 옵션을 조정할 수 있습니다. 예를 들어:from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy
custom_qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
depth=10
)
RELIABLE 정책을 사용하고, 큐의 크기를 10으로 설정합니다.RELIABLE은 모든 메시지가 도착할 것으로 예상되며, 네트워크 문제가 발생하면 재전송을 시도BEST_EFFORT는 높은 성능을 위해 신뢰성을 희생할 수 있으며, 메시지 손실이 발생할 수 있음from rclpy.qos import QoSProfile
from rclpy.qos import ReliabilityPolicy
qos_profile = QoSProfile(
reliability=ReliabilityPolicy.RELIABLE
)
TRANSIENT_LOCAL은 메시지가 노드의 생명주기를 넘어서 지속되도록 합니다. 새로운 구독자가 연결될 때 과거의 메시지를 받을 수 있습니다.VOLATILE은 메시지가 오래 보관되지 않고 빠르게 사라질 것임을 의미from rclpy.qos import DurabilityPolicy
qos_profile = QoSProfile(
durability=DurabilityPolicy.TRANSIENT_LOCAL
)
from rclpy.qos import DeadlinePolicy
qos_profile = QoSProfile(
deadline=DeadlinePolicy(rclpy.duration.Duration(seconds=2))
)
from rclpy.qos import LivelinessPolicy
qos_profile = QoSProfile(
liveliness=LivelinessPolicy.AUTOMATIC
)
from rclpy.qos import LifespanPolicy
qos_profile = QoSProfile(
lifespan=LifespanPolicy(rclpy.duration.Duration(seconds=5))
)
KEEP_LAST는 최근 메시지만 저장하며, 큐 크기는 저장할 메시지의 수를 정의합니다.KEEP_ALL은 모든 메시지를 저장하려고 시도합니다.from rclpy.qos import HistoryPolicy
qos_profile = QoSProfile(
history=HistoryPolicy.KEEP_LAST,
depth=10 # 큐 크기와 함께 사용됩니다.
)