[ROS2] 서비스 로봇에 TTS 적용하기 (과정 메모)

송민영·2022년 1월 2일
0
post-custom-banner

스크립트


  • 운송장 확인 → 택배 프로세스 시작
    • 촬영 완료되었습니다.
    • 일치하는 사원을 찾을 수 없습니다.
  • 목표 장소 출발/도착
    • %s 님께 출발하겠습니다
    • %s 님, 도착했습니다
  • 얼굴인식
    • 화면을 정면으로 응시해주세요
    • %s 님과 일치하지 않습니다 : 3초 이상 no match 발생 시
    • 인증되었습니다 : 3초 이상 true
      • 물품을 수거해주세요
      • 저에게 물품을 주세요~

TTS publisher / subscriber


  1. first_tts_pkg 생성
  2. tts_publisher.py, tts_subscriber.py 생성
    1. publisher
      • spin_once 로 콜백함수를 한번만 실행시킴
        import rclpy
        from rclpy.node import Node
        from rclpy.qos import QoSProfile # 퍼블리셔의 QoS 설정
        from std_msgs.msg import String # 퍼블리시 메시지 타입
        
        class TTS_Publisher(Node): # Node 클래스 상속
        
            def __init__(self):
                super().__init__('TTS_Publisher') # 노드 이름 지정
                qos_profile = QoSProfile(depth=10) # 퍼블리시할 데이터를 버퍼에 10개까지 저장
                self.tts_publisher = self.create_publisher(String, 'tts_message', qos_profile)
                self.timer = self.create_timer(1, self.publish_msg) # 콜백함수 : n초마다 지정한 콜백함수 실행
                self.count = 0
        
            def publish_msg(self):
                msg = String() # 퍼블리시할 메시지
                msg.data = '송민영'.format(self.count) # 메시지 저장
                self.tts_publisher.publish(msg) # 메시지 퍼블리시
                self.get_logger().info('Published message: {0}'.format(msg.data)) # 콘솔창에 출력 (==print함수)
        
        def main(args=None):
            rclpy.init(args=args) # 초기화
            node = TTS_Publisher()
            try:
                node.get_logger().info("spin될까?")
                rclpy.spin_once(node) # 콜백함수 실행
                node.get_logger().info("spin된다!!")
        
            except KeyboardInterrupt: # 'Ctrl+c'와 같은 인터럽트 시그널 예외 상황
                node.get_logger().info('Keyboard Interrupt (SIGINT)')
            finally:
                node.destroy_node() # 노드 소멸
                rclpy.shutdown() # 함수 종료
        
        if __name__ == '__main__':
            main()
        
  1. subscriber
    • TTS 모듈 가져오기
    • msg.data 값으로 메시지 받기
        import rclpy
        from rclpy.node import Node
        from rclpy.qos import QoSProfile
        from std_msgs.msg import String
        
        class TTS_Subscriber(Node):
        
            def __init__(self):
                super().__init__('TTS_Subscriber') # Node 클래스 생성자 호출
                qos_profile = QoSProfile(depth=10) # 서브스크라이브 데이터를 버퍼에 10개까지 저장
                self.tts_subscriber = self.create_subscription(
                    String, # 토픽 메시지 타입
                    'tts_message', # 토픽 이름
                    self.subscribe_topic_message, # 콜백 함수
                    qos_profile) # QoS 설정
        
            def subscribe_topic_message(self, msg) :
                msg_to_tts(msg.data)
        
        def msg_to_tts(msg):
        
            import os
            import sys
            import urllib.request
            import time
            # import playsound
        
            client_id = "###" # API id
            client_secret = "###" # API PW 
        
            msg = msg # msg 받을 경우
            encText = urllib.parse.quote("%s에게 물품을 주세요"%(msg))
        
            data = "speaker=nara&volume=0&speed=0&pitch=0&format=mp3&text=" + encText;
            url = "https://naveropenapi.apigw.ntruss.com/tts-premium/v1/tts"
        
            request = urllib.request.Request(url)
            request.add_header("X-NCP-APIGW-API-KEY-ID",client_id)
            request.add_header("X-NCP-APIGW-API-KEY",client_secret)
        
            response = urllib.request.urlopen(request, data=data.encode('utf-8'))
            rescode = response.getcode()
        
            if response.getcode()==200:
                print("CSS 성공! 파일을 저장합니다.")
                response_body = response.read()
                file_name = "음성파일"
                with open(file_name + ".mp3", 'wb') as f:
                    f.write(response_body)
                print("파일명:", file_name)
                # playsound.playsound(file_name+".mp3")
            else:
                print("Error Code:" + rescode)
        
        def main(args=None):
            rclpy.init(args=args)
            node = TTS_Subscriber()
            try:
                rclpy.spin(node)
            except KeyboardInterrupt:
                node.get_logger().info('Keyboard Interrupt (SIGINT)')
            finally:
                node.destroy_node()
                rclpy.shutdown()
        
        if __name__ == '__main__':
            main()
  1. setup.py 파일의 entry_points 수정
    entry_points={
            'console_scripts': [
                'tts_publisher = first_tts_pkg.tts_publisher:main',
                'tts_subscriber = first_tts_pkg.tts_subscriber:main',
            ],
        },
  1. terminal 창에서 실행
    $ ros2 run first_tts_pkg tts_publisher
    $ ros2 run first_tts_pkg tts_subscriber

운송장


( 내부 카메라 파이캠 촬영 → 사원DB 일치여부 확인) → 수신 → 스피커 송출

  1. 파이캠으로 촬영해서 운송장 이미지 인식, 촬영

    • 로봇 디스플레이에 촬영 버튼 추가
      • 젯슨나노 (파이캠) ↔ 디스플레이 : OCR은 화면 연결 아예 X (그냥 화면 꺼지는걸로)
        TTS만 연동 가능. ⇒ 촬영 시작은 어떻게?
        - Face detection을 써야하는 이유 : 젯슨나노에서 Face detection이 안돌아감
  2. 촬영 시 ocr_publisher.py 실행

    • rqt로 ‘촬영’ 신호 받으면 main에서 ocr_pub 또는 py 실행
  3. publish

    ROS는 토픽 정보를 publish하면 같은 네트워크 상의 모든 노드에서 subscribe 할 수 있음. 토픽 이름만 맞추면 됨!

    # ocr_publisher.py
    
    # 전송할 메시지 설정
            if name : # 이름이 출력 되면
                self.message = name
            if name == "" : # 이름이 출력되지 않았다면
                self.message = "다시 촬영해주세요."
  • 정보 추출 여부 (OCR이 수행 될 수 있는 이미지가 촬영되었는지)
    - 실패 시 ‘다시 촬영해주세요’
    - ⇒ TTS
    - OCR, 얼굴인식 등 각 기능의 토픽 메시지 이름을 다르게 설정해서 하나의 패키지에서 관련 정보를 모두 수신하도록 함.
    • 사원 존재 여부
      • 존재하지 않을 시 ‘존재하지 않는 사원입니다’
      • 기존 계획은 사원 DB와 비교하는 것이었지만 모두 존재한다고 가정 / 이름이 전달되는 수준으로 구현
    • 사원 정보
      • 스크린 : name을 포함한 사원 정보 출력
        • String 말고 다른 형태의 메시지 전송하는법
      • APP (수령인) : 수령인에게 알림 송출
      • ⇒ navigation

목표 장소 출발/도착


(SLAM → 도착 여부) 수신 → 스피커 송출

☆↗↗

얼굴인식


로봇 도착(navigation) → Face detection 일치 여부 결과 → 수신 → 스피커 송출 (배송완료) → 배송완료 사실 navigation으로 송출

  • Face detection 일치 여부
    • navigaion으로부터 name 값 받음
    • 스크린(라즈베리안)쪽에서 수동으로 입력해서 젯슨나노로 보내줌
    • while 문 :
      • n초 이상 ‘Not match’
        • ⇒ TTS : ‘정면을 바라봐주세요.’
      • ‘Match’
        • ⇒ TTS : ‘%s님, 인증되었습니다.’
        • face_subscriber = 1이면 face_publisher.py 실행하며 끝

import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String

class Face_result_Publish(Node):

	def __init__(self):
		super().__init__('face_result_publisher')
		qos_profile = QoSProfile(depth=10)
		self.face_result_publisher = self.create_publisher(String, 'face_result' ,qos_profile)
		self.timer = self.create_timer(1, self.publish_msg)
		self.count = 0 
	def publish_msg(self):
		msg = String()
		msg.data = "from rasberrian?"
		self.get_logger().info('Published message: {0}'.format(msg.data))
		self.count += 1

def main(args = None):
    rclpy.init(args=args) # 초기화 
    node = Face_result_Publish() 
    try:
        rclpy.spin(node) # 콜백함수 실행 
    except KeyboardInterrupt: # 'Ctrl+c'와 같은 인터럽트 시그널 예외 상황 
        node.get_logger().info('Keyboard Interrupt (SIGINT)')
    finally: 
        node.destroy_node() # 노드 소멸 
        rclpy.shutdown() # 함수 종료

if __name__ == '__main__':
    main()
post-custom-banner

0개의 댓글