ROS2 package - operator(4)

이준혁·2024년 12월 23일

ROS2

목록 보기
8/14

제가 작성하는 이 글은 제가 공부하기 위해 작성하고 제 학교 후배들에게 조금이라도 ros에 대해서 접하게 하고자 작성하게 되었습니다. 그러니 만약 틀린내용이나 그러니 제가 부족한 부분이 있을수 있으니 주의해주면서 봐주시면 감사하겠습니다 -Happy Lee-



지금은 이제 Operator 부분을 설명하도록 하겠다.

코드

import random

from study_msg.srv import ArithmeticOperator
import rclpy
from rclpy.node import Node


class Operator(Node):

    def __init__(self):
        super().__init__('operator')

        self.arithmetic_service_client = self.create_client(
            ArithmeticOperator,
            'arithmetic_operator')

        while not self.arithmetic_service_client.wait_for_service(timeout_sec=0.1):
            self.get_logger().warning('The arithmetic_operator service not available.')

    def send_request(self):
        service_request = ArithmeticOperator.Request()
        service_request.arithmetic_operator = random.randint(1, 4)
        futures = self.arithmetic_service_client.call_async(service_request)
        return futures


def main(args=None):
    rclpy.init(args=args)
    operator = Operator()
    future = operator.send_request()
    user_trigger = True
    try:
        while rclpy.ok():
            if user_trigger is True:
                rclpy.spin_once(operator)
                if future.done():
                    try:
                        service_response = future.result()
                    except Exception as e:  # noqa: B902
                        operator.get_logger().warn('Service call failed: {}'.format(str(e)))
                    else:
                        operator.get_logger().info(
                            'Result: {}'.format(service_response.arithmetic_result))
                        user_trigger = False
            else:
                input('Press Enter for next service call.')
                future = operator.send_request()
                user_trigger = True

    except KeyboardInterrupt:
        operator.get_logger().info('Keyboard Interrupt (SIGINT)')

    operator.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

operator 클래스 부분

class Operator(Node):

    def __init__(self):
        super().__init__('operator')

        self.arithmetic_service_client = self.create_client(
            ArithmeticOperator,
            'arithmetic_operator')

        while not self.arithmetic_service_client.wait_for_service(timeout_sec=0.1):
            self.get_logger().warning('The arithmetic_operator service not available.')

super().init('operator') : operator를 노드 이름을 설정한다.
self.create_client : 서비스 타입과 이 이름을 기반으로 서버를 찾는 것이다.

wait_for_service(timeout_sec=0.1) : 이거는 서비스 서버가 활성화 된 상태를 판단하는 것으로 최대 대기 시간을 설정하여 0.1초마다 서버의 상태를 확인합니다.
만약 지나가게 되면 logger.warning으로 로그 경고 메시지가 나온다.

    def send_request(self):
        service_request = ArithmeticOperator.Request()
        service_request.arithmetic_operator = random.randint(1, 4)
        futures = self.arithmetic_service_client.call_async(service_request)
        return futures

service_request = ArithmeticOperator.Request() : 서비스 요청 객체를 생성합니다.
service_request.arithmetic_operator : 1에서 4까지의 랜덤값을 지정합니다. 이것은 그 메시지 내에서 어떤 값을 받아온다
futures = self.arithmetic_service_client.call_async(service_request) :

여기서 상세히 들어가면 call_async이 비동기로 작동되어 service_request를 전송합니다 응답이 완려되면 futures를 반환하게 됩니다.

main 부분

def main(args=None):
    rclpy.init(args=args)
    operator = Operator()
    future = operator.send_request()
    user_trigger = True
    try:
        while rclpy.ok():
            if user_trigger is True:
                rclpy.spin_once(operator)
                if future.done():
                    try:
                        service_response = future.result()
                    except Exception as e:  # noqa: B902
                        operator.get_logger().warn('Service call failed: {}'.format(str(e)))
                    else:
                        operator.get_logger().info(
                            'Result: {}'.format(service_response.arithmetic_result))
                        user_trigger = False
            else:
                input('Press Enter for next service call.')
                future = operator.send_request()
                user_trigger = True

    except KeyboardInterrupt:
        operator.get_logger().info('Keyboard Interrupt (SIGINT)')

    operator.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

그냥 처음은 객체 생성해어 메서드를 호출하여 첫번째 서비스 요청을 하여 비동기로 전송하여 future로 반환합니다
user_trigger = True : 이거는 사용자 입력 enter키을 통해 추가 요청을 할지를 물어보는 것으로 true로 하여 첫번쨰 요청을 자동으로 처리할수 있도록 한다.
while rclpy.ok() : 노드가 실행 가능한 상태면 반복실행이 된다.

rclpy.spin_once : 함수로 노드를 주기적으로 spin시켜 지정된 콜백함수가 실행되어 이제 들어가게된다.

그렇게 요청결과가 준비가 되면 service_response 값이 반환이 되고 따로 예외처리를 둔다.

그리고 서비스 응답이 성공적으로 반환이 되면 즉 except 부분이 넘어가게되면 결과값이 출력하게 된다.

플래그를 False로 설정하여 다음 요청을 사용자가 직접 트리거하도록 설정합니다.

그리고 else에서 다시 기다리고 true일시 enter를 눌러 행동함 여기서 플래그를 True로 함

그리고 전체 끝나면 끝나게 된다.

profile
#자기공부 #틀린것도많음 #자기개발 여러분 인생이 힘들다 하더라도 그것을 깨는 순간 큰 희열감으로 옵니다~

0개의 댓글