[ros2] CompressedImage VS Image

About_work·2024년 2월 21일
0

ros2

목록 보기
17/41

-1. 들어가기 전에

0. CompressedImage VS Image

  • 데이터의 압축 여부, 네트워크 대역폭 사용, 처리 시간, 이미지 품질과 같은 요소들이 차이점

Image로 Publish하는 경우

장점

  • 데이터 무결성:
    • Image 메시지는 압축되지 않기 때문에, 데이터의 손실 없이 원본 이미지 품질을 유지
    • 이는 고정밀 이미지 처리 작업에 중요할 수 있음
  • 압축 / 해체 없어, 지연시간 최소화:
    • 이미지가 압축되지 않았기 때문에, 수신 측에서 별도의 압축 해제 과정 없이 바로 이미지 데이터를 사용할 수 있음
    • 이는 실시간 시스템에서 지연 시간을 최소화하는 데 도움이 됨
    • 하지만 네트워크를 통한 전송 속도가 느려질 수 있음.

단점

  • 높은 대역폭 요구
    • 압축되지 않은 이미지 데이터는 크기가 상당히 크기 때문에, 네트워크 전송 시 더 많은 대역폭을 사용
    • 이는 특히 대용량 이미지나 비디오 스트림을 전송할 때 문제가 될 수 있음
  • 느린 전송 속도:
    • 더 큰 데이터 크기 때문에 이미지 전송에 더 많은 시간이 소요될 수 있으며, 이는 실시간 처리에 영향을 줄 수 있습니다.

CompressedImage로 Publish하는 경우

장점

  • 낮은 대역폭 사용:
    • CompressedImage 메시지는 이미지 데이터를 압축하여 네트워크를 통한 전송 시 필요한 대역폭을 크게 줄입니다.
    • 이는 특히 대용량 이미지나 고해상도 비디오 스트림을 처리할 때 유리
  • 빠른 전송 속도:
    • 데이터 크기가 작아지므로 이미지 전송 시간이 단축되어, 네트워크 지연이 감소하고 더 빠른 데이터 전송이 가능

단점

  • 데이터 손실 가능성:
    • 대부분의 이미지 압축 알고리즘은 손실 압축을 사용하기 때문에, 원본 이미지에서 일부 정보가 손실될 수 있습니다.
    • 이는 이미지 품질을 저하시킬 수 있으며, 일부 이미지 처리 작업에서는 문제가 될 수 있음
  • 압축 해제 필요:
    • 수신 측에서 압축된 이미지 데이터를 사용하기 전에 압축을 해제해야 함
    • 이는 추가적인 처리 시간을 필요로 하며, 시스템의 전체 지연 시간에 영향을 줄 수 있음

결론

  • Image 메시지는 원본 이미지 품질을 유지해야 하거나, 처리 지연 시간을 최소화해야 하는 경우에 적합합니다.
  • CompressedImage 메시지는 네트워크 대역폭이 제한적이거나, 대량의 이미지 데이터를 효율적으로 전송해야 하는 경우에 유리합니다.
  • 실제 사용 사례에 따라, 두 메시지 타입 중 하나를 선택할 수 있으며, 때로는 두 가지 방식을 병행하여 사용하는 것이 최적의 해결책일 수도 있습니다.
  • 예를 들어, 고해상도 비디오 스트림을 원격 위치로 전송할 때는 CompressedImage를 사용하고, 로컬에서 고정밀 이미지 처리를 수행할 때는 Image 메시지를 사용할 수 있습니다.

  • ROS 2에서 rclpy를 이용해 depth image와 RGB image를 publish하는 노드를 작성

1. sensor_msgs/msg/Image

  • 이 노드는 미리 정의된 depth image와 RGB image를 publish

1.2. Python 스크립트 작성

패키지 디렉토리의 your_package_name/your_package_name 폴더 안에 image_publisher.py라는 이름으로 파일을 생성. (아래 코드)

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import numpy as np
import cv2

class ImagePublisher(Node):
    def __init__(self):
        super().__init__('image_publisher')
        self.publisher_rgb = self.create_publisher(Image, 'rgb_image', 10)
        self.publisher_depth = self.create_publisher(Image, 'depth_image', 10)
        timer_period = 1  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)

    def timer_callback(self):
        # 여기서는 예시로 OpenCV를 사용하여 랜덤 이미지를 생성합니다.
        # 실제 응용에서는 카메라에서 이미지를 읽어와야 합니다.

        # RGB 이미지 생성
        rgb_image = np.random.randint(255, size=(480, 640, 3), dtype=np.uint8)
        msg_rgb = Image()
        msg_rgb.height = 480
        msg_rgb.width = 640
        msg_rgb.encoding = 'bgr8'
        msg_rgb.step = 640 * 3
        msg_rgb.data = rgb_image.tobytes()

        # Depth 이미지 생성
        depth_image = np.random.randint(255, size=(480, 640), dtype=np.uint8)
        msg_depth = Image()
        msg_depth.height = 480
        msg_depth.width = 640
        msg_depth.encoding = 'mono8'
        msg_depth.step = 640
        msg_depth.data = depth_image.tobytes()

        # 이미지 publish
        self.publisher_rgb.publish(msg_rgb)
        self.publisher_depth.publish(msg_depth)
        self.get_logger().info('Publishing images')

def main(args=None):
    rclpy.init(args=args)
    image_publisher = ImagePublisher()
    rclpy.spin(image_publisher)
    image_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

1.3. 패키지 설정

  • setup.py( Python 패키지의 설치를 위한 스크립트) 파일을 열고, entry_points에 다음 줄을 추가하여 노드가 실행 가능하게 만듭니다.
entry_points={
    'console_scripts': [
        'image_publisher = your_package_name.image_publisher:main',
    ],
},
  • your_package_name을 실제 패키지 이름으로 변경하세요.

1.4. 빌드 및 실행

  • 패키지를 빌드하려면, 터미널에서 패키지가 위치한 디렉토리로 이동한 다음 다음 명령어를 실행
colcon build --packages-select <your_package_name>
  • 빌드가 완료되면, 다음과 같이 노드를 실행할 수 있습니다:
. install/setup.bash
ros2 run <your_package_name> image_publisher

1.5. subscribe

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import Image
import cv2
import numpy as np
from cv_bridge import CvBridge

class ImageSubscriber(Node):
    def __init__(self):
        super().__init__('image_subscriber')
        self.bridge = CvBridge()
        self.subscription_rgb = self.create_subscription(
            Image,
            'rgb_image',
            self.listener_callback_rgb,
            10)
        self.subscription_rgb  # prevent unused variable warning

        self.subscription_depth = self.create_subscription(
            Image,
            'depth_image',
            self.listener_callback_depth,
            10)
        self.subscription_depth  # prevent unused variable warning

    def listener_callback_rgb(self, msg):
        cv_image = self.bridge.imgmsg_to_cv2(msg, "bgr8")
        cv2.imshow('RGB Image', cv_image)
        cv2.waitKey(1)  # Display the image until a keypress

    def listener_callback_depth(self, msg):
        cv_image = self.bridge.imgmsg_to_cv2(msg, "16UC1")
        # Convert depth image to a visual format for display
        cv_image_visual = cv2.convertScaleAbs(cv_image, alpha=0.03)  # Scale for visualization
        cv2.imshow('Depth Image', cv_image_visual)
        cv2.waitKey(1)  # Display the image until a keypress

def main(args=None):
    rclpy.init(args=args)
    image_subscriber = ImageSubscriber()
    rclpy.spin(image_subscriber)
    image_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2. sensor_msgs/msg/CompressedImage

2.1. Python 스크립트 작성

image_compressed_publisher.py라는 이름으로 새 파일을 생성

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import numpy as np
import cv2

class CompressedImagePublisher(Node):
    def __init__(self):
        super().__init__('compressed_image_publisher')
        self.publisher = self.create_publisher(CompressedImage, 'compressed_image', 10)
        self.publisher_depth_compressed = self.create_publisher(CompressedImage, 'depth_image/compressed', 10)

        timer_period = 1  # seconds
        self.timer = self.create_timer(timer_period, self.timer_callback)

    def timer_callback(self):
        # 여기서는 예시로 OpenCV를 사용하여 랜덤 이미지를 생성합니다.
        # 실제 응용에서는 카메라에서 이미지를 읽어와야 합니다.
        
        # 랜덤 RGB 이미지 생성
        image = np.random.randint(255, size=(480, 640, 3), dtype=np.uint8)

        # OpenCV를 사용하여 이미지를 JPEG 형식으로 압축
        """
1. **retval (부울)**: 
	인코딩이 성공적으로 완료되었는지를 나타내는 부울 값
    성공하면 `True`, 실패하면 `False`를 반환합니다.
2. **buf (바이트 배열)**: 
	인코딩된 이미지 데이터를 포함하는 메모리 버퍼
    이 데이터는 파일 시스템에 저장하거나 네트워크를 통해 전송할 수 있습니다.
        """
        result, encoded_image = cv2.imencode(ext='.jpg', img=image)
        if result:
            msg = CompressedImage()
            msg.header.stamp = self.get_clock().now().to_msg()
            msg.format = "jpeg"
            msg.data = np.array(encoded_image).tobytes()

            # 압축된 이미지 publish
            self.publisher.publish(msg)
            self.get_logger().info('Publishing compressed image')
		####
        # Depth 이미지 생성
        depth_image = np.random.randint(255, size=(480, 640), dtype=np.uint8)
        result, depth_image_compressed = cv2.imencode('.png', depth_image)
		if result:
          # Depth 이미지를 CompressedImage로 publish
          msg_depth_compressed = CompressedImage()
          msg_depth_compressed.format = 'png'
          msg_depth_compressed.data = np.array(depth_image_compressed).tobytes()
          self.publisher_depth_compressed.publish(msg_depth_compressed)

def main(args=None):
    rclpy.init(args=args)
    compressed_image_publisher = CompressedImagePublisher()
    rclpy.spin(compressed_image_publisher)
    compressed_image_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

2.2. 노드 실행 방법

  • 이 스크립트를 패키지에 추가하고 실행하려면, 이전 답변에서 설명한 대로 setup.py 파일에 새로운 엔트리 포인트를 추가하고 패키지를 빌드해야 합니다.
  1. setup.py 파일을 열고 entry_points 섹션에 다음 줄을 추가합니다:
    'console_scripts': [
        'compressed_image_publisher = your_package_name.image_compressed_publisher:main',
    ],
  2. 패키지를 다시 빌드합니다:
    colcon build --packages-select <your_package_name>
  3. 빌드 후, 새로운 노드를 다음과 같이 실행합니다:
    . install/setup.bash
    ros2 run <your_package_name> compressed_image_publisher

2.3. subscribe

import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
import cv2
import numpy as np

class ImageSubscriber(Node):
    def __init__(self):
        super().__init__('image_subscriber')
        self.subscription_rgb = self.create_subscription(
            CompressedImage,
            'rgb_image/compressed',
            self.listener_callback_rgb,
            10)
        self.subscription_rgb  # prevent unused variable warning

        self.subscription_depth = self.create_subscription(
            CompressedImage,
            'depth_image/compressed',
            self.listener_callback_depth,
            10)
        self.subscription_depth  # prevent unused variable warning

    def listener_callback_rgb(self, msg):
        np_arr = np.frombuffer(msg.data, np.uint8)
        image_np = cv2.imdecode(np_arr, cv2.IMREAD_COLOR)  # Decode to color image
        cv2.imshow('RGB Image', image_np)
        cv2.waitKey(1)  # Display the image until a keypress

    def listener_callback_depth(self, msg):
        np_arr = np.frombuffer(msg.data, np.uint8)
        depth_image_np = cv2.imdecode(np_arr, cv2.IMREAD_ANYDEPTH)  # Decode to depth image
        cv2.imshow('Depth Image', depth_image_np)
        cv2.waitKey(1)  # Display the image until a keypress

def main(args=None):
    rclpy.init(args=args)
    image_subscriber = ImageSubscriber()
    rclpy.spin(image_subscriber)
    image_subscriber.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

profile
새로운 것이 들어오면 이미 있는 것과 충돌을 시도하라.

0개의 댓글