서비스 프로그래밍(Python)

두부김치·2024년 2월 14일
1

ROS2

목록 보기
19/29

1. 서비스(Service)

서비스는 동기식 양방향 메시지 송수신 방식으로 서비스의 요청(Request)을 하는 쪽을 서비스 클라이언트(Service Client), 요청받은 서비스를 수행한 후 서비스의 응답(Response)을 하는 쪽을 서비스 서버(Service Server)라고 한다. 결국 서비스는 특정 요청을 하는 클라이언트 단과 요청받은 일을 수행한 후에 결괏값을 전달하는 서버 단과의 통신이다.

이번 장에서는 아래 그림과 같이 서비스 요청을 하는 서비스 클라이언트와 서비스 응답을 하는 서비스 서버 코드를 작성해 볼 것이다. 여기서 서비스 요청 값으로는 연산자(+, -, *, /)를 임의로 선택 후에 보낼 것이고 기존에 저장한 변수 a, b를 요청 값으로 받은 연산자로 계산하여 결괏값을 서비스 응답 값으로 보내는 프로그램을 짜볼 것이다
서비스에 대한 개념이 부족하다면

서비스 이해하기

위 링크를 먼저 보고오자.

2. 서비스 서버 코드

이 코드는 이전 장에서 토픽 서브스크라이브를 설명하였을 때 소개했던 노드로써 토픽 서브스크라이브, 서비스 서버, 액션 서버를 모두 포함하고 있어서 매우 길다. 따라서 전체 소스코드중 서비스 서버 부분만 분석해보겠다.

import time

from msg_srv_action_interface_example.action import ArithmeticChecker
from msg_srv_action_interface_example.msg import ArithmeticArgument
from msg_srv_action_interface_example.srv import ArithmeticOperator
from rclpy.action import ActionServer
from rclpy.callback_groups import ReentrantCallbackGroup
from rclpy.node import Node
from rclpy.qos import QoSDurabilityPolicy
from rclpy.qos import QoSHistoryPolicy
from rclpy.qos import QoSProfile
from rclpy.qos import QoSReliabilityPolicy


class Calculator(Node):

    def __init__(self):
        super().__init__('calculator')
        self.argument_a = 0.0
        self.argument_b = 0.0
        self.argument_operator = 0
        self.argument_result = 0.0
        self.argument_formula = ''
        self.operator = ['+', '-', '*', '/']
        self.callback_group = ReentrantCallbackGroup()

        self.declare_parameter('qos_depth', 10)
        qos_depth = self.get_parameter('qos_depth').value

        QOS_RKL10V = QoSProfile(
            reliability=QoSReliabilityPolicy.RELIABLE,
            history=QoSHistoryPolicy.KEEP_LAST,
            depth=qos_depth,
            durability=QoSDurabilityPolicy.VOLATILE)

        self.arithmetic_argument_subscriber = self.create_subscription(
            ArithmeticArgument,
            'arithmetic_argument',
            self.get_arithmetic_argument,
            QOS_RKL10V,
            callback_group=self.callback_group)

        self.arithmetic_service_server = self.create_service(
            ArithmeticOperator,
            'arithmetic_operator',
            self.get_arithmetic_operator,
            callback_group=self.callback_group)

        self.arithmetic_action_server = ActionServer(
            self,
            ArithmeticChecker,
            'arithmetic_checker',
            self.execute_checker,
            callback_group=self.callback_group)

    def get_arithmetic_argument(self, msg):
        self.argument_a = msg.argument_a
        self.argument_b = msg.argument_b
        self.get_logger().info('Timestamp of the message: {0}'.format(msg.stamp))
        self.get_logger().info('Subscribed argument a: {0}'.format(self.argument_a))
        self.get_logger().info('Subscribed argument b: {0}'.format(self.argument_b))

    def get_arithmetic_operator(self, request, response):
        self.argument_operator = request.arithmetic_operator
        self.argument_result = self.calculate_given_formula(
            self.argument_a,
            self.argument_b,
            self.argument_operator)
        response.arithmetic_result = self.argument_result
        self.argument_formula = '{0} {1} {2} = {3}'.format(
                self.argument_a,
                self.operator[self.argument_operator-1],
                self.argument_b,
                self.argument_result)
        self.get_logger().info(self.argument_formula)
        return response

    def calculate_given_formula(self, a, b, operator):
        if operator == ArithmeticOperator.Request.PLUS:
            self.argument_result = a + b
        elif operator == ArithmeticOperator.Request.MINUS:
            self.argument_result = a - b
        elif operator == ArithmeticOperator.Request.MULTIPLY:
            self.argument_result = a * b
        elif operator == ArithmeticOperator.Request.DIVISION:
            try:
                self.argument_result = a / b
            except ZeroDivisionError:
                self.get_logger().error('ZeroDivisionError!')
                self.argument_result = 0.0
                return self.argument_result
        else:
            self.get_logger().error(
                'Please make sure arithmetic operator(plus, minus, multiply, division).')
            self.argument_result = 0.0
        return self.argument_result

    def execute_checker(self, goal_handle):
        self.get_logger().info('Execute arithmetic_checker action!')
        feedback_msg = ArithmeticChecker.Feedback()
        feedback_msg.formula = []
        total_sum = 0.0
        goal_sum = goal_handle.request.goal_sum
        while total_sum < goal_sum:
            total_sum += self.argument_result
            feedback_msg.formula.append(self.argument_formula)
            self.get_logger().info('Feedback: {0}'.format(feedback_msg.formula))
            goal_handle.publish_feedback(feedback_msg)
            time.sleep(1)
        goal_handle.succeed()
        result = ArithmeticChecker.Result()
        result.all_formula = feedback_msg.formula
        result.total_sum = total_sum
        return result

calculator 노드는 서브스크라이브하여 저장하고 있는 멤버 변수 a, b 와 operator 노드로부터 요청값으로 받은 연산자를 이용하여 연산한다. 그리고 operator 노드에게 연산의 결괏값을 서비스 응답값으로 보낸다.

서비스 서버 관련 코드는 서비스 서버를 선언하는 부분과 콜백 함수를 선언하는 부분으로 나뉜다. arithmetic_service_serverNode 클래스의 create_service 함수를 이용하여 서비스 서버로 선언된다. 서비스의 타입은 ArithmeticOperator, 서비스 이름은 arithmetic_operator, 서비스 클라이언트로부터 서비스 요청이 있으면 실행되는 콜백 함수는 get_arithmetic_operator 로 지정했으며 멀티 스레드 병렬 콜백 함수의 실행을 위해 callback_group 을 설정하였다.

self.arithmetic_service_server = self.create_service(
            ArithmeticOperator,  #서비스 타입
            'arithmetic_operator',  # 서비스 이름
            self.get_arithmetic_operator, # 서비스 요청시 실행되는 함수
            callback_group=self.callback_group)

이러한 설정들은 서비스 서버를 위한 기본 설정이고, 실제 서비스 요청에 해당되는 특정 수행 코드가 수행되는 부분은 get_arithmetic_operator 콜백 함수이다.

콜백 함수 부분을 살펴보면 제일 먼저 request 와 response 매개 변수가 보이는데 이는 ArithmeticOperator 클래스로 생성된 인터페이스로 서비스 요청에 해당되는 request와 서비스 응답에 해당되는 response 로 구분된다. get_arithmetic_operator 함수는 서비스 요청이 있을 때 실행되는 콜백 함수이다. 여기서는 서비스 요청(operator 노드) 시 요청값으로 받은 연산자와 이전에 토픽 서브스크라이버가 토픽값으로 전달받아 저장해둔 멤버 변수 a,b 를 연산한 후 그 결괏값을 서비스 응답값으로 반환한다.

이 함수에서 request.arithmetic_operatorargument_operator 멤버 변수에 저장하고 calculate_given_formula 함수의 인자로 넘겨주는 코드를 확인할 수 있다. 그리고 calculate_given_formula 함수로부터 전달받은 연산 결괏값을 response.arithmetic_result 에 저장한다. 끝으로 관련 수식을 문자열로 표현하여 get_logger().info() 함수를 통해 화면에 표시한다.

def get_arithmetic_operator(self, request, response):
        self.argument_operator = request.arithmetic_operator
        self.argument_result = self.calculate_given_formula(
            self.argument_a,
            self.argument_b,
            self.argument_operator)
        response.arithmetic_result = self.argument_result
        self.argument_formula = '{0} {1} {2} = {3}'.format(
                self.argument_a,
                self.operator[self.argument_operator-1],
                self.argument_b,
                self.argument_result)
        self.get_logger().info(self.argument_formula)
        return response

calculate_given_formula 함수는 앞서 설명한 것과 같이 매개변수 a, b를 가지고 주어진 연산자 operator에 따라 연산하고 그 결괏값을 반환한다.

 def calculate_given_formula(self, a, b, operator):
        if operator == ArithmeticOperator.Request.PLUS:
            self.argument_result = a + b
        elif operator == ArithmeticOperator.Request.MINUS:
            self.argument_result = a - b
        elif operator == ArithmeticOperator.Request.MULTIPLY:
            self.argument_result = a * b
        elif operator == ArithmeticOperator.Request.DIVISION:
            try:
                self.argument_result = a / b
            except ZeroDivisionError: #예외 처리
                self.get_logger().error('ZeroDivisionError!')
                self.argument_result = 0.0
                return self.argument_result
        else:
            self.get_logger().error(
                'Please make sure arithmetic operator(plus, minus, multiply, division).')
            self.argument_result = 0.0
        return self.argument_result

3. 서비스 서버 실행 코드

서비스 서버를 실행 하는 main 코드는 다음과 같다.

import rclpy
from rclpy.executors import MultiThreadedExecutor

from topic_service_action_rclpy_example.calculator.calculator import Calculator


def main(args=None):
    rclpy.init(args=args)
    try:
        calculator = Calculator()
        executor = MultiThreadedExecutor(num_threads=4)
        executor.add_node(calculator)
        try:
            executor.spin()
        except KeyboardInterrupt:
            calculator.get_logger().info('Keyboard Interrupt (SIGINT)')
        finally:
            executor.shutdown()
            calculator.arithmetic_action_server.destroy()
            calculator.destroy_node()
    finally:
        rclpy.shutdown()


if __name__ == '__main__':
    main()

4. 서비스 클라이언트 코드

서비스 클라이언트 역할을 하는 operator 노드의 전체 코드는 다음과 같다.

import random

from msg_srv_action_interface_example.srv import ArithmeticOperator
import rclpy
from rclpy.node import Node


class Operator(Node):

    def __init__(self):
        super().__init__('operator')

        self.arithmetic_service_client = self.create_client(
            ArithmeticOperator,
            'arithmetic_operator')

        while not self.arithmetic_service_client.wait_for_service(timeout_sec=0.1):
            self.get_logger().warning('The arithmetic_operator service not available.')

    def send_request(self):
        service_request = ArithmeticOperator.Request()
        service_request.arithmetic_operator = random.randint(1, 4)
        futures = self.arithmetic_service_client.call_async(service_request)
        return futures


def main(args=None):
    rclpy.init(args=args)
    operator = Operator()
    future = operator.send_request()
    user_trigger = True
    try:
        while rclpy.ok():
            if user_trigger is True:
                rclpy.spin_once(operator)
                if future.done():
                    try:
                        service_response = future.result()
                    except Exception as e:  # noqa: B902
                        operator.get_logger().warn('Service call failed: {}'.format(str(e)))
                    else:
                        operator.get_logger().info(
                            'Result: {}'.format(service_response.arithmetic_result))
                        user_trigger = False
            else:
                input('Press Enter for next service call.')
                future = operator.send_request()
                user_trigger = True

    except KeyboardInterrupt:
        operator.get_logger().info('Keyboard Interrupt (SIGINT)')

    operator.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

Operator 클래스는 rclpy.node 모듈의 Node 클래스를 상속받고 있으며 생성자를 통해 노드 이름을 operator로 초기화했다.
그 뒤 서비스 클라이언트를 선언해주는데 이는 Node 클래스의 create_client 함수를 이용했다. 서비스의 타입으로 서비스 서버와 동일하게 ArithmeticOperator를 선언하였고, 서비스 이름은arithmetic_operator 로 선언하였다. arithmetic_service_client 의 wait_for_service 함수는 서비스 요청을 할 수 있는 상태인지 알아보기 위해 서비스 서버가 실행되어 있는지 확인하는 함수로 0.1초 간격으로 서비스 서버가 실행되어 있는지 확인하게 된다.

class Operator(Node):

    def __init__(self):
        super().__init__('operator')

        self.arithmetic_service_client = self.create_client(
            ArithmeticOperator,
            'arithmetic_operator')

        while not self.arithmetic_service_client.wait_for_service(timeout_sec=0.1):
            self.get_logger().warning('The arithmetic_operator service not available.')

우리가 작성하고 있는 서비스 클라이언트의 목적은 서비스 서버에게 연산에 필요한 연산자를 보내는 것이다. 이 send_request 함수가 실질적인 서비스 클라이언트의 실행 코드로 서비스 서버에게 서비스 요청값을 보내고 이에 해당하는 응답값을 받게 된다. 함수 내용을 살펴보자. 우선 서비스 요청값을 보내기 위해 우리가 미리 작성해둔 서비스 인터페이스인 ArithmetiOperator.Request 클래스로 service_request를 선언하였다. 그리고 서비스 요청값으로 random.randint 함수를 이용하여 특정 연산자를 service_request.arithmetic_operator 변수에 저장하였다. 그 뒤 call_async(service_request) 함수로 서비스 요청을 수행하게 설정하였다. 끝으로 서비스 상태 및 응답값을 담은 futures를 반환하게 된다.

				ArithmeticOperator 인터페이스
# Constants
int8 PLUS = 1
int8 MINUS = 2
int8 MULTIPLY = 3
int8 DIVISION = 4

# Request
int8 arithmetic_operator
---
# Response
float32 arithmetic_result
   def send_request(self):
        service_request = ArithmeticOperator.Request()
        service_request.arithmetic_operator = random.randint(1, 4)
        futures = self.arithmetic_service_client.call_async(service_request)
        return futures

5. 노드 실행 코드

서비스 클라이언트 노드인 operator 노드는 'topic_service_action_rclpy_example' 패키지의 일부로 패키지 설정 파일에 'entry_points'로 실행 가능한 콘솔 스크립트의 이름과 호출 함수를 기입하도록 되어 있는데 우리는 하기와 같이 4개의 노드를 작성하고 'ros2 run' 과 같은 노드 실행 명령어를 통하여 각각의 노드를 실행시키고 있다. operator 노드는 topic_service_action_rclpy_example 패키지의 arithmetic 폴더에 operator.py의 main문에 실행 코드가 담겨져 있다.

entry_points={
        'console_scripts': [
            'argument = topic_service_action_rclpy_example.arithmetic.argument:main',
            'operator = topic_service_action_rclpy_example.arithmetic.operator:main',
            'calculator = topic_service_action_rclpy_example.calculator.main:main',
            'checker = topic_service_action_rclpy_example.checker.main:main',
        ],
    },

아래의 main 함수 안의 실행 코드를 보면 rclpy.init 를 이용하여 프로그램을 초기화하고 위에서 작성한 Operator 클래스를 operator 객체로 생성한다. 그리고 operator.send_request 함수를 호출하여 서비스 요청을 보내고 future 변수를 통해 응답값을 받을 준비를 한다.

그 뒤 rclpy.spin_once 함수를 이용하여 생성한 노드를 주기적으로 spin시켜 지정된 콜백함수가 실행될 수 있도록 하고 있다. 이때 매 spin마다 노드의 콜백함수가 실행되고 서비스 응답값을 받았을 때 future의 done 함수를 이용해 요청값을 제대로 받았는지 확인 후 결괏값은 service_response = future.result() 같이 service_response라는 변수에 저장하여 사용하게 된다. 서비스 응답값은 get_logger().info() 함수를 이용하여 화면에 서비스 응답값에 해당되는 연산 결괏값을 표시하는 것이다. 그리고 종료 Ctrl + c와 같은 인터럽트 시그널 예외 상황에서는 operator를 소멸시키고 rclpy.shutdown 함수로 노드를 종료하게 된다.

def main(args=None):
    rclpy.init(args=args)
    operator = Operator()
    future = operator.send_request()
    user_trigger = True
    try:
        while rclpy.ok():
            if user_trigger is True:
                rclpy.spin_once(operator)
                if future.done():
                    try:
                        service_response = future.result()
                    except Exception as e:  # noqa: B902
                        operator.get_logger().warn('Service call failed: {}'.format(str(e)))
                    else:
                        operator.get_logger().info(
                            'Result: {}'.format(service_response.arithmetic_result))
                        user_trigger = False
            else:
                input('Press Enter for next service call.')
                future = operator.send_request()
                user_trigger = True

    except KeyboardInterrupt:
        operator.get_logger().info('Keyboard Interrupt (SIGINT)')

    operator.destroy_node()
    rclpy.shutdown()


if __name__ == '__main__':
    main()

서비스 클라이언트는 토픽 퍼블리셔와 달리 지속적으로 수행하지 않고 한 번만 수행하도록 했다. 처음 노드가 실행 되면 임의의 연산자가 서비스 요청값으로 서비스 서버에게 전달된다. 서비스 요청값을 전달 이후 서비스 요청을 다시하려면 operator 노드를 실행시킨 터미널 창에서 엔터키를 누르면 된다. 그러면 연산자가 랜덤으로 선택되어 그 결괏값이 달라짐을 확인할 수 있다.

  future = operator.send_request()
    user_trigger = True

    try:
        while rclpy.ok():
            if user_trigger is True:

                # 서비스 응답값 확인 코드

            else:
                input('Press Enter for next service call.')
                future = operator.send_request()  # 서비스 요청값 전송
                user_trigger = True

오늘은 서비스 서버와 클라이언트 코드를 작성해보았다. 다음시간에는 액션 서버와 클라이언트 작성을 해보겠다.

profile
Robotics

0개의 댓글