데이터의 압축 여부
, 네트워크 대역폭 사용
, 처리 시간
, 이미지 품질
과 같은 요소들이 차이점Image
로 Publish하는 경우Image
메시지는 압축되지 않기 때문에, 데이터의 손실 없이 원본 이미지 품질을 유지CompressedImage
로 Publish하는 경우CompressedImage
메시지는 이미지 데이터를 압축하여 네트워크를 통한 전송 시 필요한 대역폭을 크게 줄입니다. Image
메시지는 원본 이미지 품질을 유지해야 하거나, 처리 지연 시간을 최소화해야 하는 경우에 적합합니다.CompressedImage
메시지는 네트워크 대역폭이 제한적이거나, 대량의 이미지 데이터를 효율적으로 전송해야 하는 경우에 유리합니다.CompressedImage
를 사용하고, 로컬에서 고정밀 이미지 처리를 수행할 때는 Image
메시지를 사용할 수 있습니다.rclpy
를 이용해 depth image와 RGB image를 publish하는 노드를 작성패키지 디렉토리의 your_package_name/your_package_name
폴더 안에 image_publisher.py
라는 이름으로 파일을 생성. (아래 코드)
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2
class ImagePublisher(Node):
def __init__(self):
super().__init__('image_publisher')
self.publisher_rgb = self.create_publisher(Image, 'rgb_image', 10)
self.publisher_depth = self.create_publisher(Image, 'depth_image', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
# 여기서는 예시로 OpenCV를 사용하여 랜덤 이미지를 생성합니다.
# 실제 응용에서는 카메라에서 이미지를 읽어와야 합니다.
# RGB 이미지 생성
rgb_image = np.random.randint(255, size=(480, 640, 3), dtype=np.uint8)
msg_rgb = Image()
msg_rgb.height = 480
msg_rgb.width = 640
msg_rgb.encoding = 'bgr8'
msg_rgb.step = 640 * 3
msg_rgb.data = rgb_image.tobytes()
# Depth 이미지 생성
depth_image = np.random.randint(255, size=(480, 640), dtype=np.uint8)
msg_depth = Image()
msg_depth.height = 480
msg_depth.width = 640
msg_depth.encoding = 'mono8'
msg_depth.step = 640
msg_depth.data = depth_image.tobytes()
# 이미지 publish
self.publisher_rgb.publish(msg_rgb)
self.publisher_depth.publish(msg_depth)
self.get_logger().info('Publishing images')
def main(args=None):
rclpy.init(args=args)
image_publisher = ImagePublisher()
rclpy.spin(image_publisher)
image_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py
( Python 패키지의 설치를 위한 스크립트) 파일을 열고, entry_points
에 다음 줄을 추가하여 노드가 실행 가능하게 만듭니다.entry_points={
'console_scripts': [
'image_publisher = your_package_name.image_publisher:main',
],
},
your_package_name
을 실제 패키지 이름으로 변경하세요.colcon build --packages-select <your_package_name>
. install/setup.bash
ros2 run <your_package_name> image_publisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import cv2
import numpy as np
from cv_bridge import CvBridge
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
self.bridge = CvBridge()
self.subscription_rgb = self.create_subscription(
Image,
'rgb_image',
self.listener_callback_rgb,
10)
self.subscription_rgb # prevent unused variable warning
self.subscription_depth = self.create_subscription(
Image,
'depth_image',
self.listener_callback_depth,
10)
self.subscription_depth # prevent unused variable warning
def listener_callback_rgb(self, msg):
cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
cv2.imshow('RGB Image', cv_image)
cv2.waitKey(1) # Display the image until a keypress
def listener_callback_depth(self, msg):
cv_image = self.bridge.imgmsg_to_cv2(msg, "16UC1")
# Convert depth image to a visual format for display
cv_image_visual = cv2.convertScaleAbs(cv_image, alpha=0.03) # Scale for visualization
cv2.imshow('Depth Image', cv_image_visual)
cv2.waitKey(1) # Display the image until a keypress
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
rclpy.spin(image_subscriber)
image_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
sensor_msgs/msg/CompressedImage
image_compressed_publisher.py
라는 이름으로 새 파일을 생성
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import numpy as np
import cv2
class CompressedImagePublisher(Node):
def __init__(self):
super().__init__('compressed_image_publisher')
self.publisher = self.create_publisher(CompressedImage, 'compressed_image', 10)
self.publisher_depth_compressed = self.create_publisher(CompressedImage, 'depth_image/compressed', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
# 여기서는 예시로 OpenCV를 사용하여 랜덤 이미지를 생성합니다.
# 실제 응용에서는 카메라에서 이미지를 읽어와야 합니다.
# 랜덤 RGB 이미지 생성
image = np.random.randint(255, size=(480, 640, 3), dtype=np.uint8)
# OpenCV를 사용하여 이미지를 JPEG 형식으로 압축
"""
1. **retval (부울)**:
인코딩이 성공적으로 완료되었는지를 나타내는 부울 값
성공하면 `True`, 실패하면 `False`를 반환합니다.
2. **buf (바이트 배열)**:
인코딩된 이미지 데이터를 포함하는 메모리 버퍼
이 데이터는 파일 시스템에 저장하거나 네트워크를 통해 전송할 수 있습니다.
"""
result, encoded_image = cv2.imencode(ext='.jpg', img=image)
if result:
msg = CompressedImage()
msg.header.stamp = self.get_clock().now().to_msg()
msg.format = "jpeg"
msg.data = np.array(encoded_image).tobytes()
# 압축된 이미지 publish
self.publisher.publish(msg)
self.get_logger().info('Publishing compressed image')
####
# Depth 이미지 생성
depth_image = np.random.randint(255, size=(480, 640), dtype=np.uint8)
result, depth_image_compressed = cv2.imencode('.png', depth_image)
if result:
# Depth 이미지를 CompressedImage로 publish
msg_depth_compressed = CompressedImage()
msg_depth_compressed.format = 'png'
msg_depth_compressed.data = np.array(depth_image_compressed).tobytes()
self.publisher_depth_compressed.publish(msg_depth_compressed)
def main(args=None):
rclpy.init(args=args)
compressed_image_publisher = CompressedImagePublisher()
rclpy.spin(compressed_image_publisher)
compressed_image_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
setup.py
파일에 새로운 엔트리 포인트를 추가하고 패키지를 빌드해야 합니다.setup.py
파일을 열고 entry_points
섹션에 다음 줄을 추가합니다:'console_scripts': [
'compressed_image_publisher = your_package_name.image_compressed_publisher:main',
],
colcon build --packages-select <your_package_name>
. install/setup.bash
ros2 run <your_package_name> compressed_image_publisher
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv2
import numpy as np
class ImageSubscriber(Node):
def __init__(self):
super().__init__('image_subscriber')
self.subscription_rgb = self.create_subscription(
CompressedImage,
'rgb_image/compressed',
self.listener_callback_rgb,
10)
self.subscription_rgb # prevent unused variable warning
self.subscription_depth = self.create_subscription(
CompressedImage,
'depth_image/compressed',
self.listener_callback_depth,
10)
self.subscription_depth # prevent unused variable warning
def listener_callback_rgb(self, msg):
np_arr = np.frombuffer(msg.data, np.uint8)
image_np = cv2.imdecode(np_arr, cv2.IMREAD_COLOR) # Decode to color image
cv2.imshow('RGB Image', image_np)
cv2.waitKey(1) # Display the image until a keypress
def listener_callback_depth(self, msg):
np_arr = np.frombuffer(msg.data, np.uint8)
depth_image_np = cv2.imdecode(np_arr, cv2.IMREAD_ANYDEPTH) # Decode to depth image
cv2.imshow('Depth Image', depth_image_np)
cv2.waitKey(1) # Display the image until a keypress
def main(args=None):
rclpy.init(args=args)
image_subscriber = ImageSubscriber()
rclpy.spin(image_subscriber)
image_subscriber.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()