import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from sensor_msgs.msg import CompressedImage
import numpy as np
import cv2
from cv_bridge import CvBridge
import base64
# 이미지 메시지 데이터를 어레이 형태로 변환
bridge = CvBridge()
class ImageSubscriber(Node) :
def __init__(self) :
super().__init__('image_sub')
qos = QoSProfile(depth=10)
self.image_sub = self.create_subscription(
CompressedImage, # 임포트 된 메시지 타입
'/camera/image/compressed', # 토픽리스트에서 조회한 토픽 주소
self.image_callback, # 정의한 콜백함수
qos)
self.image = np.empty(shape=[1])
def image_callback(self, msg) :
img = bridge.compressed_imgmsg_to_cv2(msg)
cv2.imshow('ros_img', img)
cv2.waitKey(100)
def main(args=None) :
rclpy.init(args=args)
node = ImageSubscriber()
try :
rclpy.spin(node)
except KeyboardInterrupt :
node.get_logger().info('Stopped by Keyboard')
finally :
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__' :
main()
import roslibpy
import cv2
import base64
import logging
import time
from cv_bridge import CvBridge
import sys
bridge = CvBridge()
fmt = '%(asctime)s %(levelname)8s: %(message)s'
logging.basicConfig(format=fmt, level=logging.INFO)
log = logging.getLogger(__name__)
client = roslibpy.Ros(host='localhost', port=9090)
client.run()
publisher = roslibpy.Topic(client, '/camera/image/compressed', 'sensor_msgs/CompressedImage')
publisher.advertise()
cam = cv2.VideoCapture(1)
# def publish_image():
try:
while client.is_connected:
ret, frame = cam.read()
if not(frame is None):
cv2.imshow("frame", frame)
resized = cv2.resize(frame, (320, 320))
_, buffer = cv2.imencode('.jpeg', resized)
encoded = base64.b64encode(buffer).decode('ascii')
publisher.publish(dict(format='jpeg', data=encoded))
cv2.waitKey(100)
# time.sleep(0.1)
except Exception as e:
print("An error occurred:", type(e).__name__, "–", e)
finally:
cv2.destroyAllWindows()
cam.release()
publisher.unadvertise()
client.terminate()
sys.exit()
<워크스페이스>/src 에서 ros2 pkg create --build-type ament-python --node-name <노드이름> <패키지 이름>package 내 setup.py 수정ros2 launch rosbridge_server rosbridge_websocket_launch.xmlcolcon buildhttps://www.robotstory.co.kr/king/?mode=view&board_pid=898
https://95mkr.tistory.com/entry/ROS6
https://github.com/ros2/domain_bridge
https://github.com/ros2/ros1_bridge
https://github.com/jmguerreroh/ros2_computer_vision/tree/ros2-humble/src/computer_vision/src
https://deepdeepit.tistory.com/118
안녕하세요. publisher (Python (Non ROS))부분은 ROS 외부에서 파이썬 파일을 실행한다는 의미인가요? 직접 해보니 sensor_msgs 모듈을 찾을 수 없다고 나오면서 에러가 나는데 어떻게 해결 할 수 있는지 문의드립니다.