spin_once
로 콜백함수를 한번만 실행시킴 import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile # 퍼블리셔의 QoS 설정
from std_msgs.msg import String # 퍼블리시 메시지 타입
class TTS_Publisher(Node): # Node 클래스 상속
def __init__(self):
super().__init__('TTS_Publisher') # 노드 이름 지정
qos_profile = QoSProfile(depth=10) # 퍼블리시할 데이터를 버퍼에 10개까지 저장
self.tts_publisher = self.create_publisher(String, 'tts_message', qos_profile)
self.timer = self.create_timer(1, self.publish_msg) # 콜백함수 : n초마다 지정한 콜백함수 실행
self.count = 0
def publish_msg(self):
msg = String() # 퍼블리시할 메시지
msg.data = '송민영'.format(self.count) # 메시지 저장
self.tts_publisher.publish(msg) # 메시지 퍼블리시
self.get_logger().info('Published message: {0}'.format(msg.data)) # 콘솔창에 출력 (==print함수)
def main(args=None):
rclpy.init(args=args) # 초기화
node = TTS_Publisher()
try:
node.get_logger().info("spin될까?")
rclpy.spin_once(node) # 콜백함수 실행
node.get_logger().info("spin된다!!")
except KeyboardInterrupt: # 'Ctrl+c'와 같은 인터럽트 시그널 예외 상황
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node() # 노드 소멸
rclpy.shutdown() # 함수 종료
if __name__ == '__main__':
main()
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
class TTS_Subscriber(Node):
def __init__(self):
super().__init__('TTS_Subscriber') # Node 클래스 생성자 호출
qos_profile = QoSProfile(depth=10) # 서브스크라이브 데이터를 버퍼에 10개까지 저장
self.tts_subscriber = self.create_subscription(
String, # 토픽 메시지 타입
'tts_message', # 토픽 이름
self.subscribe_topic_message, # 콜백 함수
qos_profile) # QoS 설정
def subscribe_topic_message(self, msg) :
msg_to_tts(msg.data)
def msg_to_tts(msg):
import os
import sys
import urllib.request
import time
# import playsound
client_id = "###" # API id
client_secret = "###" # API PW
msg = msg # msg 받을 경우
encText = urllib.parse.quote("%s에게 물품을 주세요"%(msg))
data = "speaker=nara&volume=0&speed=0&pitch=0&format=mp3&text=" + encText;
url = "https://naveropenapi.apigw.ntruss.com/tts-premium/v1/tts"
request = urllib.request.Request(url)
request.add_header("X-NCP-APIGW-API-KEY-ID",client_id)
request.add_header("X-NCP-APIGW-API-KEY",client_secret)
response = urllib.request.urlopen(request, data=data.encode('utf-8'))
rescode = response.getcode()
if response.getcode()==200:
print("CSS 성공! 파일을 저장합니다.")
response_body = response.read()
file_name = "음성파일"
with open(file_name + ".mp3", 'wb') as f:
f.write(response_body)
print("파일명:", file_name)
# playsound.playsound(file_name+".mp3")
else:
print("Error Code:" + rescode)
def main(args=None):
rclpy.init(args=args)
node = TTS_Subscriber()
try:
rclpy.spin(node)
except KeyboardInterrupt:
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
entry_points={
'console_scripts': [
'tts_publisher = first_tts_pkg.tts_publisher:main',
'tts_subscriber = first_tts_pkg.tts_subscriber:main',
],
},
$ ros2 run first_tts_pkg tts_publisher
$ ros2 run first_tts_pkg tts_subscriber
( 내부 카메라 파이캠 촬영 → 사원DB 일치여부 확인) → 수신 → 스피커 송출
파이캠으로 촬영해서 운송장 이미지 인식, 촬영
촬영 시 ocr_publisher.py 실행
publish
ROS는 토픽 정보를 publish하면 같은 네트워크 상의 모든 노드에서 subscribe 할 수 있음. 토픽 이름만 맞추면 됨!
# ocr_publisher.py
# 전송할 메시지 설정
if name : # 이름이 출력 되면
self.message = name
if name == "" : # 이름이 출력되지 않았다면
self.message = "다시 촬영해주세요."
(SLAM → 도착 여부) 수신 → 스피커 송출
☆↗↗
로봇 도착(navigation) → Face detection 일치 여부 결과 → 수신 → 스피커 송출 (배송완료) → 배송완료 사실 navigation으로 송출
import rclpy
from rclpy.node import Node
from rclpy.qos import QoSProfile
from std_msgs.msg import String
class Face_result_Publish(Node):
def __init__(self):
super().__init__('face_result_publisher')
qos_profile = QoSProfile(depth=10)
self.face_result_publisher = self.create_publisher(String, 'face_result' ,qos_profile)
self.timer = self.create_timer(1, self.publish_msg)
self.count = 0
def publish_msg(self):
msg = String()
msg.data = "from rasberrian?"
self.get_logger().info('Published message: {0}'.format(msg.data))
self.count += 1
def main(args = None):
rclpy.init(args=args) # 초기화
node = Face_result_Publish()
try:
rclpy.spin(node) # 콜백함수 실행
except KeyboardInterrupt: # 'Ctrl+c'와 같은 인터럽트 시그널 예외 상황
node.get_logger().info('Keyboard Interrupt (SIGINT)')
finally:
node.destroy_node() # 노드 소멸
rclpy.shutdown() # 함수 종료
if __name__ == '__main__':
main()