ROS #22 Hangman game

남생이·2024년 10월 23일

ROS

목록 보기
22/28

1. 구조도

2. code 역할 설명

  • letter_publisher: LetterPublisher 클래스 객체 생성 후 노드 실행 - 매초마다 알파벳 순차적 발행

  • progress_action_client: GameProgress 클라이언트 생성, goal 전송, 피드백 처리 -> goal에 대한 응답을 받으면 결과를 비동기적으로 요청하여 결과를 처리

  • progress_action_server: GameProgress 기반으로 서버를 생성하여 수신, 게임 상태를 저장하고 데이터를 수신 --> 피드백 생성 및 현재 게임상태와 시도횟수 제공

  • user_input: CheckLetter 클라이언트 생성, 서비스 실행 전 대기, 서비스 요청을 저장하여 send_request 메서드를 호출 --> 특정 문자를 서버로 전달

  • word_service: check_letter, letter_topic, progress의 서비스와 토픽을 통해 서비스 생성, 구독자 생성, 퍼블리셔 생성 --> 무작위로 단어를 선택 후 수신한 메시지를 처리, 저장하여 진행상황을 토픽으로 전달

  1. LetterPublisher:

    • LetterPublisher 클래스는 알파벳을 순차적으로 발행하는 노드입니다.
    • 매초마다 알파벳을 순차적으로 letter_topic이라는 토픽으로 발행합니다.
    • 알파벳이 a부터 z까지 반복됩니다.
  2. ProgressActionClient:

    • GameProgress 액션 클라이언트를 생성하여 서버와 통신합니다.
    • 목표(goal)을 전송하고 서버에서 피드백을 수신합니다.
    • 서버로부터 goal에 대한 응답을 받으면 결과를 비동기적으로 요청하여 처리합니다.
    • 최종적으로 게임의 결과(승리 또는 패배)를 처리합니다.
  3. ProgressActionServer:

    • GameProgress 액션 서버를 생성하여 클라이언트로부터 goal을 수신하고 처리합니다.
    • 게임 상태를 관리하며, progress 토픽을 통해 게임 진행 상태를 구독합니다.
    • 피드백을 생성하여 현재 게임 상태와 남은 시도 횟수를 클라이언트에 제공합니다.
    • 게임 종료 시, 승리 여부에 따라 결과를 반환합니다.
  4. UserInput:

    • CheckLetter 서비스 클라이언트를 생성하여 사용자의 입력을 처리하는 노드입니다.
    • 서비스가 실행되기 전까지 대기하며, 사용자가 입력한 특정 문자를 서버로 전송합니다.
    • send_request 메서드를 통해 서버에 입력한 문자를 전달하고, 이를 바탕으로 단어 상태를 업데이트합니다.
  5. WordService:

    • check_letter, letter_topic, progress와 관련된 서비스 및 토픽을 관리하는 노드입니다.
    • 무작위로 단어를 선택하고, 사용자가 입력한 문자를 처리하여 단어의 현재 상태를 업데이트합니다.
    • 사용자의 입력 결과를 Progress 메시지로 발행하여 게임의 진행 상황(현재 단어 상태, 남은 시도 횟수, 게임 종료 여부 등)을 다른 노드에 전달합니다.

전체 시스템은 각 노드가 상호작용하며 행맨 게임의 흐름을 제어하는 구조로, 사용자 입력과 게임의 진행 상태를 관리하고 결과를 통신합니다.

3. 코드 생성

3.1 hangman_interfaces

  • CheckLetter.srv
# Empty request
---
string updated_word_state
bool is_correct
string message
  • Progress.msg
string current_state
int32 attempts_left
bool game_over
bool won
  • GameProgress.action
# Goal
# Empty since the client doesn't need to send any data
---
# Result
bool game_over
bool won
---
# Feedback
bool game_over

3.2 letter_publisher.py

# hangman_game/letter_publisher.py

import rclpy
from rclpy.node import Node
from std_msgs.msg import String

class LetterPublisher(Node):

    def __init__(self):
        super().__init__('letter_publisher')
        self.publisher_ = self.create_publisher(String, 'letter_topic', 10)
        self.timer = self.create_timer(1.0, self.publish_letter)
        self.current_letter = ord('a') # 문자 a를 아스키 코드값으로 변환하여 저장

    def publish_letter(self):
        msg = String()
        msg.data = chr(self.current_letter)
        self.publisher_.publish(msg)
        self.get_logger().info(f'Publishing: {msg.data}')
        self.current_letter += 1
        if self.current_letter > ord('z'):
            self.current_letter = ord('a')

def main(args=None):
    rclpy.init(args=args)
    letter_publisher = LetterPublisher()
    rclpy.spin(letter_publisher)
    letter_publisher.destroy_node()
    rclpy.shutdown()

if __name__ == '__main__':
    main()

3.3 word_service.py

# hangman_game/word_service.py

import rclpy
from rclpy.node import Node
from hangman_interfaces.srv import CheckLetter
from hangman_interfaces.msg import Progress
from std_msgs.msg import String
import random


class WordService(Node):

    def __init__(self):
        super().__init__("word_service")
        self.service = self.create_service(
            CheckLetter, "check_letter", self.check_letter_callback
        )
        self.subscription = self.create_subscription(
            String, "letter_topic", self.letter_callback, 10
        )
        self.progress_publisher = self.create_publisher(Progress, "progress", 10)

        data = Progress()
        data.current_state = ""
        data.attempts_left = 20
        data.game_over = False
        data.won = False
        self.progress_publisher.publish(data)

        self.current_letter = ""
        self.word_list = ["python", "hangman", "robot", "ros", "interface"]
        self.word = random.choice(self.word_list)
        self.word_state = ["_"] * len(self.word)
        self.get_logger().info(f"The word has {len(self.word)} letters.")
        self.attempts_left = 20  # Max attempts

    def letter_callback(self, msg):
        self.current_letter = msg.data

    def check_letter_callback(self, request, response):
        letter = self.current_letter
        if letter in self.word:
            for idx, char in enumerate(self.word):
                if char == letter:
                    self.word_state[idx] = letter
            response.is_correct = True
            response.message = "Correct!"
        else:
            self.attempts_left -= 1
            response.is_correct = False
            response.message = "WRONG"
        response.updated_word_state = "".join(self.word_state)
        self.get_logger().info(f"Received letter: {letter}")
        self.get_logger().info(f"Current word state: {response.updated_word_state}")

        # Publish progress
        progress_msg = Progress()
        progress_msg.current_state = response.updated_word_state
        progress_msg.attempts_left = self.attempts_left
        progress_msg.game_over = "_" not in self.word_state or self.attempts_left <= 0
        progress_msg.won = "_" not in self.word_state
        self.progress_publisher.publish(progress_msg)

        return response


def main(args=None):
    rclpy.init(args=args)
    word_service = WordService()
    rclpy.spin(word_service)
    word_service.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()

3.5 user_input.py

# hangman_game/user_input.py

import rclpy
from rclpy.node import Node
from hangman_interfaces.srv import CheckLetter
import threading


class UserInput(Node):

    def __init__(self):
        super().__init__("user_input")
        self.cli = self.create_client(CheckLetter, "check_letter")
        while not self.cli.wait_for_service(timeout_sec=1.0):
            self.get_logger().info("Service not available, waiting...")
        self.req = CheckLetter.Request()
        self.get_logger().info("Press Enter to check the current letter.")
        threading.Thread(target=self.input_thread, daemon=True).start()

    def input_thread(self):
        while True:
            input("Press Enter to input the current letter.")
            self.send_request()

    def send_request(self):
        future = self.cli.call_async(self.req)
        # future.add_done_callback(self.callback_future)

    # def callback_future(self, future):
    #     response = future.result()
    #     self.get_logger().info(f'{response.message}')
    #     self.get_logger().info(f'Word State: {response.updated_word_state}')


def main(args=None):
    rclpy.init(args=args)
    user_input = UserInput()
    rclpy.spin(user_input)
    user_input.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()

3.6 progress_action_client.py

# hangman_game/progress_action_client.py

import rclpy
from rclpy.node import Node
from hangman_interfaces.action import GameProgress
from rclpy.action import ActionClient


class ProgressActionClient(Node):

    def __init__(self):
        super().__init__("progress_action_client")
        self._action_client = ActionClient(self, GameProgress, "game_progress")
        self.result_received = False
        self.send_goal()

    def send_goal(self):
        self.get_logger().info("Action Client: Waiting for action server...")
        self._action_client.wait_for_server()
        goal_msg = GameProgress.Goal()
        self.get_logger().info("Action Client: Sending goal request...")
        self._send_goal_future = self._action_client.send_goal_async(
            goal_msg, feedback_callback=self.feedback_callback
        )
        self._send_goal_future.add_done_callback(self.goal_response_callback)

    def feedback_callback(self, feedback_msg):
        feedback = feedback_msg.feedback
        if feedback.game_over:
            self.get_logger().info("Action Client: Game over detected in feedback")

    def goal_response_callback(self, future):
        goal_handle = future.result()
        if not goal_handle.accepted:
            self.get_logger().info("Action Client: Goal rejected")
            self.result_received = True
            return

        self.get_logger().info("Action Client: Goal accepted")
        self._get_result_future = goal_handle.get_result_async()
        self._get_result_future.add_done_callback(self.get_result_callback)

    def get_result_callback(self, future):
        result = future.result().result
        if result.won:
            self.get_logger().info("Action Client: Congratulations! You won!")
        else:
            self.get_logger().info("Action Client: Game Over. You lost.")
        self.result_received = True


def main(args=None):
    rclpy.init(args=args)
    action_client = ProgressActionClient()
    while rclpy.ok():
        rclpy.spin_once(action_client)
        if action_client.result_received:
            break
    action_client.destroy_node()
    rclpy.shutdown()


if __name__ == "__main__":
    main()

3.7 progress_action_server.py

# hangman_game/progress_action_server.py

import rclpy
from rclpy.node import Node
from hangman_interfaces.action import GameProgress
from hangman_interfaces.msg import Progress
from rclpy.action import ActionServer
from rclpy.executors import MultiThreadedExecutor
import time
import threading


class ProgressActionServer(Node):

    def __init__(self):
        super().__init__("progress_action_server")
        self._action_server = ActionServer(
            self, GameProgress, "game_progress", self.execute_callback
        )
        self.current_progress = Progress()
        self.progress_received_event = threading.Event()

        # Subscribe to the 'progress' topic to get game updates
        self.subscription = self.create_subscription(
            Progress, "progress", self.progress_callback, 10
        )
        self.subscription  # prevent unused variable warning

        self.get_logger().info("Action Server Initialized")
        self.get_logger().info(f"GAME OVER: {self.current_progress.game_over}")
        self.get_logger().info(f"WON: {self.current_progress.won}")

    def progress_callback(self, msg):
        self.current_progress = msg
        self.get_logger().info(
            f"Progress updated: {self.current_progress.current_state}"
        )

    def execute_callback(self, goal_handle):
        self.get_logger().info("Action Server: Received goal request")
        feedback_msg = GameProgress.Feedback()
        update_rate = 1.0  # seconds

        while not self.current_progress.game_over:
            # Publish feedback
            feedback_msg.game_over = self.current_progress.game_over
            goal_handle.publish_feedback(feedback_msg)
            self.get_logger().info(
                f"Current State: {self.current_progress.current_state}"
            )
            self.get_logger().info(
                f"Attempts Left: {self.current_progress.attempts_left}"
            )

            # Sleep to wait for next update
            time.sleep(update_rate)

            # Check if the goal has been canceled
            # if goal_handle.is_cancel_requested:
            #     self.get_logger().info('Action Server: Goal canceled')
            #     goal_handle.canceled()
            #     return GameProgress.Result()

        # Game is over
        result = GameProgress.Result()
        result.game_over = self.current_progress.game_over
        result.won = self.current_progress.won

        if self.current_progress.won:
            self.get_logger().info("Action Server: Congratulations! You won!")
        else:
            self.get_logger().info("Action Server: Game Over. You lost.")

        goal_handle.succeed()
        self.get_logger().info("Action Server: Goal succeeded")
        return result


def main(args=None):
    rclpy.init(args=args)
    action_server = ProgressActionServer()

    # Use MultiThreadedExecutor to allow concurrent callbacks
    executor = MultiThreadedExecutor()
    executor.add_node(action_server)
    try:
        executor.spin()
    except KeyboardInterrupt:
        pass
    finally:
        action_server.destroy_node()
        rclpy.shutdown()


if __name__ == "__main__":
    main()

4. setup.py

from setuptools import find_packages, setup

package_name = 'hangman_game'

setup(
    name=package_name,
    version='0.0.0',
    packages=find_packages(exclude=['test']),
    data_files=[
        ('share/ament_index/resource_index/packages',
            ['resource/' + package_name]),
        ('share/' + package_name, ['package.xml']),
    ],
    install_requires=['setuptools'],
    zip_safe=True,
    maintainer='user',
    maintainer_email='mh9716@kookmin.ac.kr',
    description='TODO: Package description',
    license='TODO: License declaration',
    tests_require=['pytest'],
    entry_points={
        'console_scripts': [
            'letter_publisher = hangman_game.letter_publisher:main',
            'word_service = hangman_game.word_service:main',
            'user_input = hangman_game.user_input:main',
            'progress_action_server = hangman_game.progress_action_server:main',
            'progress_action_client = hangman_game.progress_action_client:main',
        ],
    },
)

5. CMakeLists.txt

cmake_minimum_required(VERSION 3.8)
project(hangman_interfaces)

if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang")
  add_compile_options(-Wall -Wextra -Wpedantic)
endif()

# find dependencies
find_package(ament_cmake REQUIRED)
find_package(builtin_interfaces REQUIRED)
find_package(rosidl_default_generators REQUIRED)

ament_export_dependencies(rosidl_default_runtime)

# uncomment the following section in order to fill in
# further dependencies manually.
# find_package(<dependency> REQUIRED)

set(msg_files
  "msg/Progress.msg"
)

set(srv_files
  "srv/CheckLetter.srv"
)

set(action_files
  "action/GameProgress.action"
)

rosidl_generate_interfaces(${PROJECT_NAME}
  ${msg_files}
  ${srv_files}
  ${action_files}
  DEPENDENCIES builtin_interfaces
)

if(BUILD_TESTING)
  find_package(ament_lint_auto REQUIRED)
  # the following line skips the linter which checks for copyrights
  # comment the line when a copyright and license is added to all source files
  set(ament_cmake_copyright_FOUND TRUE)
  # the following line skips cpplint (only works in a git repo)
  # comment the line when this package is in a git repo and when
  # a copyright and license is added to all source files
  set(ament_cmake_cpplint_FOUND TRUE)
  ament_lint_auto_find_test_dependencies()
endif()

ament_package()

profile
공부하는 거북이

0개의 댓글