
letter_publisher: LetterPublisher 클래스 객체 생성 후 노드 실행 - 매초마다 알파벳 순차적 발행
progress_action_client: GameProgress 클라이언트 생성, goal 전송, 피드백 처리 -> goal에 대한 응답을 받으면 결과를 비동기적으로 요청하여 결과를 처리
progress_action_server: GameProgress 기반으로 서버를 생성하여 수신, 게임 상태를 저장하고 데이터를 수신 --> 피드백 생성 및 현재 게임상태와 시도횟수 제공
user_input: CheckLetter 클라이언트 생성, 서비스 실행 전 대기, 서비스 요청을 저장하여 send_request 메서드를 호출 --> 특정 문자를 서버로 전달
word_service: check_letter, letter_topic, progress의 서비스와 토픽을 통해 서비스 생성, 구독자 생성, 퍼블리셔 생성 --> 무작위로 단어를 선택 후 수신한 메시지를 처리, 저장하여 진행상황을 토픽으로 전달
LetterPublisher:
LetterPublisher 클래스는 알파벳을 순차적으로 발행하는 노드입니다.letter_topic이라는 토픽으로 발행합니다.a부터 z까지 반복됩니다.ProgressActionClient:
GameProgress 액션 클라이언트를 생성하여 서버와 통신합니다.ProgressActionServer:
GameProgress 액션 서버를 생성하여 클라이언트로부터 goal을 수신하고 처리합니다.progress 토픽을 통해 게임 진행 상태를 구독합니다.UserInput:
CheckLetter 서비스 클라이언트를 생성하여 사용자의 입력을 처리하는 노드입니다.send_request 메서드를 통해 서버에 입력한 문자를 전달하고, 이를 바탕으로 단어 상태를 업데이트합니다.WordService:
check_letter, letter_topic, progress와 관련된 서비스 및 토픽을 관리하는 노드입니다.Progress 메시지로 발행하여 게임의 진행 상황(현재 단어 상태, 남은 시도 횟수, 게임 종료 여부 등)을 다른 노드에 전달합니다.전체 시스템은 각 노드가 상호작용하며 행맨 게임의 흐름을 제어하는 구조로, 사용자 입력과 게임의 진행 상태를 관리하고 결과를 통신합니다.

# Empty request
---
string updated_word_state
bool is_correct
string message
string current_state
int32 attempts_left
bool game_over
bool won
# Goal
# Empty since the client doesn't need to send any data
---
# Result
bool game_over
bool won
---
# Feedback
bool game_over
# hangman_game/letter_publisher.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class LetterPublisher(Node):
def __init__(self):
super().__init__('letter_publisher')
self.publisher_ = self.create_publisher(String, 'letter_topic', 10)
self.timer = self.create_timer(1.0, self.publish_letter)
self.current_letter = ord('a') # 문자 a를 아스키 코드값으로 변환하여 저장
def publish_letter(self):
msg = String()
msg.data = chr(self.current_letter)
self.publisher_.publish(msg)
self.get_logger().info(f'Publishing: {msg.data}')
self.current_letter += 1
if self.current_letter > ord('z'):
self.current_letter = ord('a')
def main(args=None):
rclpy.init(args=args)
letter_publisher = LetterPublisher()
rclpy.spin(letter_publisher)
letter_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
# hangman_game/word_service.py
import rclpy
from rclpy.node import Node
from hangman_interfaces.srv import CheckLetter
from hangman_interfaces.msg import Progress
from std_msgs.msg import String
import random
class WordService(Node):
def __init__(self):
super().__init__("word_service")
self.service = self.create_service(
CheckLetter, "check_letter", self.check_letter_callback
)
self.subscription = self.create_subscription(
String, "letter_topic", self.letter_callback, 10
)
self.progress_publisher = self.create_publisher(Progress, "progress", 10)
data = Progress()
data.current_state = ""
data.attempts_left = 20
data.game_over = False
data.won = False
self.progress_publisher.publish(data)
self.current_letter = ""
self.word_list = ["python", "hangman", "robot", "ros", "interface"]
self.word = random.choice(self.word_list)
self.word_state = ["_"] * len(self.word)
self.get_logger().info(f"The word has {len(self.word)} letters.")
self.attempts_left = 20 # Max attempts
def letter_callback(self, msg):
self.current_letter = msg.data
def check_letter_callback(self, request, response):
letter = self.current_letter
if letter in self.word:
for idx, char in enumerate(self.word):
if char == letter:
self.word_state[idx] = letter
response.is_correct = True
response.message = "Correct!"
else:
self.attempts_left -= 1
response.is_correct = False
response.message = "WRONG"
response.updated_word_state = "".join(self.word_state)
self.get_logger().info(f"Received letter: {letter}")
self.get_logger().info(f"Current word state: {response.updated_word_state}")
# Publish progress
progress_msg = Progress()
progress_msg.current_state = response.updated_word_state
progress_msg.attempts_left = self.attempts_left
progress_msg.game_over = "_" not in self.word_state or self.attempts_left <= 0
progress_msg.won = "_" not in self.word_state
self.progress_publisher.publish(progress_msg)
return response
def main(args=None):
rclpy.init(args=args)
word_service = WordService()
rclpy.spin(word_service)
word_service.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
# hangman_game/user_input.py
import rclpy
from rclpy.node import Node
from hangman_interfaces.srv import CheckLetter
import threading
class UserInput(Node):
def __init__(self):
super().__init__("user_input")
self.cli = self.create_client(CheckLetter, "check_letter")
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info("Service not available, waiting...")
self.req = CheckLetter.Request()
self.get_logger().info("Press Enter to check the current letter.")
threading.Thread(target=self.input_thread, daemon=True).start()
def input_thread(self):
while True:
input("Press Enter to input the current letter.")
self.send_request()
def send_request(self):
future = self.cli.call_async(self.req)
# future.add_done_callback(self.callback_future)
# def callback_future(self, future):
# response = future.result()
# self.get_logger().info(f'{response.message}')
# self.get_logger().info(f'Word State: {response.updated_word_state}')
def main(args=None):
rclpy.init(args=args)
user_input = UserInput()
rclpy.spin(user_input)
user_input.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
# hangman_game/progress_action_client.py
import rclpy
from rclpy.node import Node
from hangman_interfaces.action import GameProgress
from rclpy.action import ActionClient
class ProgressActionClient(Node):
def __init__(self):
super().__init__("progress_action_client")
self._action_client = ActionClient(self, GameProgress, "game_progress")
self.result_received = False
self.send_goal()
def send_goal(self):
self.get_logger().info("Action Client: Waiting for action server...")
self._action_client.wait_for_server()
goal_msg = GameProgress.Goal()
self.get_logger().info("Action Client: Sending goal request...")
self._send_goal_future = self._action_client.send_goal_async(
goal_msg, feedback_callback=self.feedback_callback
)
self._send_goal_future.add_done_callback(self.goal_response_callback)
def feedback_callback(self, feedback_msg):
feedback = feedback_msg.feedback
if feedback.game_over:
self.get_logger().info("Action Client: Game over detected in feedback")
def goal_response_callback(self, future):
goal_handle = future.result()
if not goal_handle.accepted:
self.get_logger().info("Action Client: Goal rejected")
self.result_received = True
return
self.get_logger().info("Action Client: Goal accepted")
self._get_result_future = goal_handle.get_result_async()
self._get_result_future.add_done_callback(self.get_result_callback)
def get_result_callback(self, future):
result = future.result().result
if result.won:
self.get_logger().info("Action Client: Congratulations! You won!")
else:
self.get_logger().info("Action Client: Game Over. You lost.")
self.result_received = True
def main(args=None):
rclpy.init(args=args)
action_client = ProgressActionClient()
while rclpy.ok():
rclpy.spin_once(action_client)
if action_client.result_received:
break
action_client.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
# hangman_game/progress_action_server.py
import rclpy
from rclpy.node import Node
from hangman_interfaces.action import GameProgress
from hangman_interfaces.msg import Progress
from rclpy.action import ActionServer
from rclpy.executors import MultiThreadedExecutor
import time
import threading
class ProgressActionServer(Node):
def __init__(self):
super().__init__("progress_action_server")
self._action_server = ActionServer(
self, GameProgress, "game_progress", self.execute_callback
)
self.current_progress = Progress()
self.progress_received_event = threading.Event()
# Subscribe to the 'progress' topic to get game updates
self.subscription = self.create_subscription(
Progress, "progress", self.progress_callback, 10
)
self.subscription # prevent unused variable warning
self.get_logger().info("Action Server Initialized")
self.get_logger().info(f"GAME OVER: {self.current_progress.game_over}")
self.get_logger().info(f"WON: {self.current_progress.won}")
def progress_callback(self, msg):
self.current_progress = msg
self.get_logger().info(
f"Progress updated: {self.current_progress.current_state}"
)
def execute_callback(self, goal_handle):
self.get_logger().info("Action Server: Received goal request")
feedback_msg = GameProgress.Feedback()
update_rate = 1.0 # seconds
while not self.current_progress.game_over:
# Publish feedback
feedback_msg.game_over = self.current_progress.game_over
goal_handle.publish_feedback(feedback_msg)
self.get_logger().info(
f"Current State: {self.current_progress.current_state}"
)
self.get_logger().info(
f"Attempts Left: {self.current_progress.attempts_left}"
)
# Sleep to wait for next update
time.sleep(update_rate)
# Check if the goal has been canceled
# if goal_handle.is_cancel_requested:
# self.get_logger().info('Action Server: Goal canceled')
# goal_handle.canceled()
# return GameProgress.Result()
# Game is over
result = GameProgress.Result()
result.game_over = self.current_progress.game_over
result.won = self.current_progress.won
if self.current_progress.won:
self.get_logger().info("Action Server: Congratulations! You won!")
else:
self.get_logger().info("Action Server: Game Over. You lost.")
goal_handle.succeed()
self.get_logger().info("Action Server: Goal succeeded")
return result
def main(args=None):
rclpy.init(args=args)
action_server = ProgressActionServer()
# Use MultiThreadedExecutor to allow concurrent callbacks
executor = MultiThreadedExecutor()
executor.add_node(action_server)
try:
executor.spin()
except KeyboardInterrupt:
pass
finally:
action_server.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()
from setuptools import find_packages, setup
package_name = 'hangman_game'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='user',
maintainer_email='mh9716@kookmin.ac.kr',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
'letter_publisher = hangman_game.letter_publisher:main',
'word_service = hangman_game.word_service:main',
'user_input = hangman_game.user_input:main',
'progress_action_server = hangman_game.progress_action_server:main',
'progress_action_client = hangman_game.progress_action_client:main',
],
},
)
cmake_minimum_required(VERSION 3.8)
project(hangman_interfaces)
if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
add_compile_options(-Wall -Wextra -Wpedantic)
endif()
# find dependencies
find_package(ament_cmake REQUIRED)
find_package(builtin_interfaces REQUIRED)
find_package(rosidl_default_generators REQUIRED)
ament_export_dependencies(rosidl_default_runtime)
# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)
set(msg_files
"msg/Progress.msg"
)
set(srv_files
"srv/CheckLetter.srv"
)
set(action_files
"action/GameProgress.action"
)
rosidl_generate_interfaces(${PROJECT_NAME}
${msg_files}
${srv_files}
${action_files}
DEPENDENCIES builtin_interfaces
)
if(BUILD_TESTING)
find_package(ament_lint_auto REQUIRED)
# the following line skips the linter which checks for copyrights
# comment the line when a copyright and license is added to all source files
set(ament_cmake_copyright_FOUND TRUE)
# the following line skips cpplint (only works in a git repo)
# comment the line when this package is in a git repo and when
# a copyright and license is added to all source files
set(ament_cmake_cpplint_FOUND TRUE)
ament_lint_auto_find_test_dependencies()
endif()
ament_package()
