ROS2 - Python(Non-ros)간 Image publish, subscribe

조홍기·2023년 12월 7일

ros

목록 보기
4/8

subscriber (ROS2 humble)

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import CompressedImage
import numpy as np
import cv2
from cv_bridge import CvBridge
import base64

# 이미지 메시지 데이터를 어레이 형태로 변환
bridge = CvBridge() 

class ImageSubscriber(Node) :
  def __init__(self) :
    super().__init__('image_sub')
    qos = QoSProfile(depth=10)
    self.image_sub = self.create_subscription(
      CompressedImage, # 임포트 된 메시지 타입 
      '/camera/image/compressed', # 토픽리스트에서 조회한 토픽 주소
      self.image_callback, # 정의한 콜백함수
      qos)
    self.image = np.empty(shape=[1])

  def image_callback(self, msg) :
    img = bridge.compressed_imgmsg_to_cv2(msg)
    cv2.imshow('ros_img', img)
    cv2.waitKey(100)
     
def main(args=None) :
  rclpy.init(args=args)
  node = ImageSubscriber()

  try :
    rclpy.spin(node)
  except KeyboardInterrupt :
    node.get_logger().info('Stopped by Keyboard')
  finally :
    node.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__' :
  main()

publisher (Python (Non ROS))


import roslibpy
import cv2
import base64
import logging
import time
from cv_bridge import CvBridge
import sys


bridge = CvBridge() 

fmt = '%(asctime)s %(levelname)8s: %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
log = logging.getLogger(__name__)

client = roslibpy.Ros(host='localhost', port=9090)
client.run()

publisher = roslibpy.Topic(client, '/camera/image/compressed', 'sensor_msgs/CompressedImage')
publisher.advertise()


cam = cv2.VideoCapture(1)

# def publish_image():

try:
    while client.is_connected:
        ret, frame = cam.read()

        if not(frame is None):
            cv2.imshow("frame", frame)
            resized = cv2.resize(frame, (320, 320))
            _, buffer = cv2.imencode('.jpeg', resized)
            encoded = base64.b64encode(buffer).decode('ascii')
            publisher.publish(dict(format='jpeg', data=encoded))
        cv2.waitKey(100)
        # time.sleep(0.1)

except Exception as e:
    print("An error occurred:", type(e).__name__, "–", e)
finally:
    cv2.destroyAllWindows()
    cam.release()
    publisher.unadvertise()
    client.terminate()
    sys.exit()

사용법

  1. <워크스페이스>/src 에서 ros2 pkg create --build-type ament-python --node-name <노드이름> <패키지 이름>
  2. 패키지 내 subscriber python 파일 작성
  3. packagesetup.py 수정
  4. ros2 launch rosbridge_server rosbridge_websocket_launch.xml
  5. colcon build
  6. bash파일 source
  7. ros2 통해서 subscriber 실행
  8. publisher 작성 후 실행

참고 링크

https://www.robotstory.co.kr/king/?mode=view&board_pid=898
https://95mkr.tistory.com/entry/ROS6

추후 multi port subscribe를 위해 남기는 링크

https://github.com/ros2/domain_bridge
https://github.com/ros2/ros1_bridge
https://github.com/jmguerreroh/ros2_computer_vision/tree/ros2-humble/src/computer_vision/src
https://deepdeepit.tistory.com/118

profile
ROS, Python, Cpp 공부 중입니다.

1개의 댓글

comment-user-thumbnail
2024년 6월 4일

안녕하세요. publisher (Python (Non ROS))부분은 ROS 외부에서 파이썬 파일을 실행한다는 의미인가요? 직접 해보니 sensor_msgs 모듈을 찾을 수 없다고 나오면서 에러가 나는데 어떻게 해결 할 수 있는지 문의드립니다.

답글 달기