데이터의 압축 여부, 네트워크 대역폭 사용, 처리 시간, 이미지 품질과 같은 요소들이 차이점Image로 Publish하는 경우Image 메시지는 압축되지 않기 때문에, 데이터의 손실 없이 원본 이미지 품질을 유지CompressedImage로 Publish하는 경우CompressedImage 메시지는 이미지 데이터를 압축하여 네트워크를 통한 전송 시 필요한 대역폭을 크게 줄입니다. Image 메시지는 원본 이미지 품질을 유지해야 하거나, 처리 지연 시간을 최소화해야 하는 경우에 적합합니다.CompressedImage 메시지는 네트워크 대역폭이 제한적이거나, 대량의 이미지 데이터를 효율적으로 전송해야 하는 경우에 유리합니다.CompressedImage를 사용하고, 로컬에서 고정밀 이미지 처리를 수행할 때는 Image 메시지를 사용할 수 있습니다.rclpy를 이용해 depth image와 RGB image를 publish하는 노드를 작성패키지 디렉토리의 your_package_name/your_package_name 폴더 안에 image_publisher.py라는 이름으로 파일을 생성. (아래 코드)
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
class ImagePublisher(Node):
def __init__(self):
super().__init__('image_publisher')
self.publisher_rgb = self.create_publisher(Image, 'rgb_image', 10)
self.publisher_depth = self.create_publisher(Image, 'depth_image', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
# 여기서는 예시로 OpenCV를 사용하여 랜덤 이미지를 생성합니다.
# 실제 응용에서는 카메라에서 이미지를 읽어와야 합니다.
# RGB 이미지 생성
rgb_image = np.random.randint(255, size=(480, 640, 3), dtype=np.uint8)
msg_rgb = Image()
msg_rgb.height = 480
msg_rgb.width = 640
msg_rgb.encoding = 'bgr8'
msg_rgb.step = 640 * 3
msg_rgb.data = rgb_image.tobytes()
# Depth 이미지 생성
depth_image = np.random.randint(255, size=(480, 640), dtype=np.uint8)
msg_depth = Image()
msg_depth.height = 480
msg_depth.width = 640
msg_depth.encoding = 'mono8'
msg_depth.step = 640
msg_depth.data = depth_image.tobytes()
# 이미지 publish
self.publisher_rgb.publish(msg_rgb)
self.publisher_depth.publish(msg_depth)
self.get_logger().info('Publishing images')
def main(args=None):
rclpy.init(args=args)
image_publisher = ImagePublisher()
rclpy.spin(image_publisher)
image_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py( Python 패키지의 설치를 위한 스크립트) 파일을 열고, entry_points에 다음 줄을 추가하여 노드가 실행 가능하게 만듭니다.entry_points={
'console_scripts': [
'image_publisher = your_package_name.image_publisher:main',
],
},
your_package_name을 실제 패키지 이름으로 변경하세요.colcon build --packages-select <your_package_name>
. install/setup.bash
ros2 run <your_package_name> image_publisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import cv2
import numpy as np
from cv_bridge import CvBridge
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
self.bridge = CvBridge()
self.subscription_rgb = self.create_subscription(
Image,
'rgb_image',
self.listener_callback_rgb,
10)
self.subscription_rgb # prevent unused variable warning
self.subscription_depth = self.create_subscription(
Image,
'depth_image',
self.listener_callback_depth,
10)
self.subscription_depth # prevent unused variable warning
def listener_callback_rgb(self, msg):
cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
cv2.imshow('RGB Image', cv_image)
cv2.waitKey(1) # Display the image until a keypress
def listener_callback_depth(self, msg):
cv_image = self.bridge.imgmsg_to_cv2(msg, "16UC1")
# Convert depth image to a visual format for display
cv_image_visual = cv2.convertScaleAbs(cv_image, alpha=0.03) # Scale for visualization
cv2.imshow('Depth Image', cv_image_visual)
cv2.waitKey(1) # Display the image until a keypress
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
rclpy.spin(image_subscriber)
image_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
sensor_msgs/msg/CompressedImageimage_compressed_publisher.py라는 이름으로 새 파일을 생성
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import numpy as np
import cv2
class CompressedImagePublisher(Node):
def __init__(self):
super().__init__('compressed_image_publisher')
self.publisher = self.create_publisher(CompressedImage, 'compressed_image', 10)
self.publisher_depth_compressed = self.create_publisher(CompressedImage, 'depth_image/compressed', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
# 여기서는 예시로 OpenCV를 사용하여 랜덤 이미지를 생성합니다.
# 실제 응용에서는 카메라에서 이미지를 읽어와야 합니다.
# 랜덤 RGB 이미지 생성
image = np.random.randint(255, size=(480, 640, 3), dtype=np.uint8)
# OpenCV를 사용하여 이미지를 JPEG 형식으로 압축
"""
1. **retval (부울)**:
인코딩이 성공적으로 완료되었는지를 나타내는 부울 값
성공하면 `True`, 실패하면 `False`를 반환합니다.
2. **buf (바이트 배열)**:
인코딩된 이미지 데이터를 포함하는 메모리 버퍼
이 데이터는 파일 시스템에 저장하거나 네트워크를 통해 전송할 수 있습니다.
"""
result, encoded_image = cv2.imencode(ext='.jpg', img=image)
if result:
msg = CompressedImage()
msg.header.stamp = self.get_clock().now().to_msg()
msg.format = "jpeg"
msg.data = np.array(encoded_image).tobytes()
# 압축된 이미지 publish
self.publisher.publish(msg)
self.get_logger().info('Publishing compressed image')
####
# Depth 이미지 생성
depth_image = np.random.randint(255, size=(480, 640), dtype=np.uint8)
result, depth_image_compressed = cv2.imencode('.png', depth_image)
if result:
# Depth 이미지를 CompressedImage로 publish
msg_depth_compressed = CompressedImage()
msg_depth_compressed.format = 'png'
msg_depth_compressed.data = np.array(depth_image_compressed).tobytes()
self.publisher_depth_compressed.publish(msg_depth_compressed)
def main(args=None):
rclpy.init(args=args)
compressed_image_publisher = CompressedImagePublisher()
rclpy.spin(compressed_image_publisher)
compressed_image_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py 파일에 새로운 엔트리 포인트를 추가하고 패키지를 빌드해야 합니다.setup.py 파일을 열고 entry_points 섹션에 다음 줄을 추가합니다:'console_scripts': [
'compressed_image_publisher = your_package_name.image_compressed_publisher:main',
],colcon build --packages-select <your_package_name>. install/setup.bash
ros2 run <your_package_name> compressed_image_publisherimport rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv2
import numpy as np
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
self.subscription_rgb = self.create_subscription(
CompressedImage,
'rgb_image/compressed',
self.listener_callback_rgb,
10)
self.subscription_rgb # prevent unused variable warning
self.subscription_depth = self.create_subscription(
CompressedImage,
'depth_image/compressed',
self.listener_callback_depth,
10)
self.subscription_depth # prevent unused variable warning
def listener_callback_rgb(self, msg):
np_arr = np.frombuffer(msg.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) # Decode to color image
cv2.imshow('RGB Image', image_np)
cv2.waitKey(1) # Display the image until a keypress
def listener_callback_depth(self, msg):
np_arr = np.frombuffer(msg.data, np.uint8)
depth_image_np = cv2.imdecode(np_arr, cv2.IMREAD_ANYDEPTH) # Decode to depth image
cv2.imshow('Depth Image', depth_image_np)
cv2.waitKey(1) # Display the image until a keypress
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
rclpy.spin(image_subscriber)
image_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()