ROS #12 service client & server code

남생이·2024년 10월 22일

ROS

목록 보기
12/28

1. service

ex_calculator 패키지에서는 연산자를 임의 선택 후 전송하고 기존 저장된 변수 a,b를 계산하여 결과값을 서비스 응답값으로 전달하는 프로그램을 구성한다.

2. Service 관련 명령어

'''service'''
$ ros2 service list # 서비스 목록 확인
$ ros2 service type [서비스 이름]  # 서비스의 타입 확인

$ ros2 service call [서비스 이름][서비스 타입]"{값}" # 특정 서비스르 호출
$ ros2 service call /reset std_srvs/srv/Empty
$ ros2 service call /turtle1/set_pen turtlesim/srv/SetPen "{r: 255, g: 255, width: 10}"

$ ros2 service call /kill turtlesim/srv/Kill "name: 'turtle1'"
$ ros2 service call /spawn turtlesim/srv/Spawn "{x: 5.5, y: 9.0, theta: 1.57, name: 'leonardo'}"

$ ros2 service find [서비스 타입] # 지정 서비스 타입의 서비스 출력

3. Service client - operator.py

3.1 import

  • .srv을 불러와 msg 타입을 정의
import random
from ros_study_msgs.srv import ArithmeticOperator
import rclpy
from rclpy.node import Node

3.2 class 정의

  • operator이름으로 노드 정의
  • create_client로 client 생성
  • arithmetic_operator 이름의 서비스를 호출하는 클라이언트 생성
  • ArithmeticOperator 타입의 요청을 받고 응답하는 서버와 통신
class Operator(Node):
    def __init__(self):
        super().__init__('operator')
        self.arithmetic_service_client = self.create_client(
            ArithmeticOperator,
            'arithmetic_operator')
            
        while not self.arithmetic_service_client.wait_for_service(timeout_sec=0.1):
            self.get_logger().warning('The arithmetic_operator service not available.')

3.2.1 client 실행 코드

def send_request(self):
        service_request = ArithmeticOperator.Request()
        service_request.arithmetic_operator = random.randint(1, 4)
        futures = self.arithmetic_service_client.call_async(service_request)
        return futures
  • service_request.arithmetic_operator = random.randint(1, 4)
    : arithmetic_operator 필드에 1~4의 무작위 값 할당
  • call_async: 비동기 방식으로 요청 전송
  • futures: 서비스 상태 및 응답값을 담은 객체

3.3 main

def main(args=None):
    rclpy.init(args=args)
    operator = Operator()
    future = operator.send_request()
    user_trigger = True
    
try:
        while rclpy.ok():
            if user_trigger is True:
                rclpy.spin_once(operator)
                if future.done():
                    try:
                        service_response = future.result()
                    except Exception as e:  # noqa: B902
                        operator.get_logger().warn('Service call failed: {}'.format(str(e)))
                    else:
                        operator.get_logger().info(
                            'Result: {}'.format(service_response.arithmetic_result))
                        user_trigger = False
            else:
                input('Press Enter for next service call.')
                future = operator.send_request()
                user_trigger = True
  • Operator 클래스 --> operator로 생성

  • future = operator.send_request(): 서비스 요청을 보내고 응답값을 받음

  • spin_once 함수로 콜백함수 한번실행 후 이에 대한 서비스 응답값을 받았을 경우, 즉 future의 상태값이 done()일 경우 service_respoense에 저장하여 연산에 사용한다.

  • 단, 서비스 클라이언트는 실행한 후 종료되는 컨셉으로 토픽처럼 지속적인 수행은 하지 않는다.

  • 해당 코드에서만 user_trigger와 input을 이용하여 처음 노드 실행 시 최초 1회에 한해 바로 임의의 연산자를 서비스 요청값으로 서비스 서버에게 전달하여 노드가 종료되기 전가지 사용자의 입력값을 다시 받아 다시 임의의 연산자를 랜덤으로 골라 해당 연산자를 서비스 요청값으로 보내게 된다.


2. service server - calculator.py

2.1 서버 선언

  • arithmetic_service_server는 서비스 서버로 Node 클래스의 create_service 함수를 이용하여 서비스 서버로 선언
self.arithmetic_service_server = self.create_service(
            ArithmeticOperator,
            'arithmetic_operator',
            self.get_arithmetic_operator,
            callback_group=self.callback_group)

2.2 서비스 요청 수행 코드 - 콜백함수

def get_arithmetic_operator(self, request, response):
        self.argument_operator = request.arithmetic_operator
        self.argument_result = self.calculate_given_formula(
            self.argument_a,
            self.argument_b,
            self.argument_operator)
        response.arithmetic_result = self.argument_result
        self.argument_formula = '{0} {1} {2} = {3}'.format(
                self.argument_a,
                self.operator[self.argument_operator-1],
                self.argument_b,
                self.argument_result)
        self.get_logger().info(self.argument_formula)
        return response
  • calculate_given_formula 함수로부터 받은 연산 결과값은 response.arithmetic_result에 저장하고 끝으로 관련 수식을 문자열로 표현하여 get_logger().info() 함수를 통해 화면에 표시하고 있다
def calculate_given_formula(self, a, b, operator):
        if operator == ArithmeticOperator.Request.PLUS:
            self.argument_result = a + b
        elif operator == ArithmeticOperator.Request.MINUS:
            self.argument_result = a - b
        elif operator == ArithmeticOperator.Request.MULTIPLY:
            self.argument_result = a * b
        elif operator == ArithmeticOperator.Request.DIVISION:
            try:
                self.argument_result = a / b
            except ZeroDivisionError:
                self.get_logger().error('ZeroDivisionError!')
                self.argument_result = 0.0
                return self.argument_result
        else:
            self.get_logger().error(
                'Please make sure arithmetic operator(plus, minus, multiply, division).')
            self.argument_result = 0.0
        return self.argument_result

일반화 코드

  • client 코드
# 필수 import
import rclpy
from rclpy.node import Node

# srv 파일을 import (패키지 이름과 서비스 파일 이름을 명확히 해야 함)
from <pkg_name>.srv import <SrvFileName>  # ex. from my_package.srv import AddTwoInts

# QoS는 서비스에서 사용되지 않음 -> 서비스는 요청-응답 기반의 단일 통신

class <ClassName>(Node):  # 클래스 이름 정의
    def __init__(self):
        super().__init__('client_node_name')  # 클라이언트 노드 이름 설정
        # 서비스 클라이언트 생성 (서비스 타입과 서비스 이름 필요)
        self.client_ = self.create_client(<SrvFileName>, 'service_name')

        # 서비스가 준비될 때까지 기다림 (비동기 통신이므로 서비스가 실행 중일 때까지 대기)
        while not self.client_.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('Service not available, waiting...')
    
    # 서비스 요청을 처리하는 콜백 함수
    def client_callback_function(self, request_data):
        # 요청 객체 생성
        request = <SrvFileName>.Request()
        request.<request_field> = request_data  # ex. request.a = 5

        # 비동기 방식으로 서비스 요청
        future = self.client_.call_async(request)  
        rclpy.spin_until_future_complete(self, future)  # 응답이 올 때까지 대기

        # 응답 처리
        try:
            response = future.result()  # 비동기 응답 결과를 가져옴
            self.get_logger().info('Result: %s' % str(response.<response_field>))  # 응답 결과 출력
        except Exception as e:
            self.get_logger().error('Service call failed: %r' % (e,))  # 예외 처리
        return future
    
def main(args=None):
    rclpy.init(args=args)  # ROS 2 초기화
    client_node = <ClassName>()  # 클라이언트 클래스 인스턴스 생성

    # 서비스에 요청할 데이터 (예: 랜덤 숫자, 사용자 입력 등)
    request_data = <some_data>  # 예: 5
    
    # 요청 전송 함수 호출
    client_node.client_callback_function(request_data)

    client_node.destroy_node()  # 노드 종료
    rclpy.shutdown()  # ROS 2 시스템 종료

if __name__ == '__main__':
    main()
  • server 코드
# 필수 import
import rclpy
from rclpy.node import Node

# srv 파일을 import (패키지 이름과 서비스 파일 이름을 명확히 해야 함)
from <pkg_name>.srv import <SrvFileName> 

class <ClassName>(Node):  # 클래스 이름 정의
    def __init__(self):
        super().__init__('server_node_name')  # 서버 노드 이름 설정
        # 서비스 서버 생성 (서비스 타입과 서비스 이름 필요)
        self.service_ = self.create_service(
            <SrvFileName>,  # 서비스 타입
            'service_name',  # 서비스 이름
            self.service_callback_function  # 콜백 함수로 서비스 요청 처리
        )
        self.get_logger().info('Service server ready and waiting for requests...')

    # 서비스 요청을 처리하는 콜백 함수
    def service_callback_function(self, request, response):
        self.get_logger().info('Received request: %d' % request.<request_field>)  # 요청 데이터 출력
        
        # 요청 처리 (연산 등)
        if request.<request_field> == <SrvFileName>.Request.PLUS:
            response.<response_field> = request.a + request.b  # 덧셈 처리 예시
        elif request.<request_field> == <SrvFileName>.Request.MINUS:
            response.<response_field> = request.a - request.b  # 뺄셈 처리 예시
        elif request.<request_field> == <SrvFileName>.Request.MULTIPLY:
            response.<response_field> = request.a * request.b  # 곱셈 처리 예시
        elif request.<request_field> == <SrvFileName>.Request.DIVISION:
            if request.b != 0:
                response.<response_field> = request.a / request.b  # 나눗셈 처리 예시
            else:
                response.<response_field> = float('inf')  # 0으로 나눌 때 무한대 처리
        else:
            self.get_logger().error('Invalid operation requested!')  # 잘못된 연산자 처리

        self.get_logger().info('Sending response: %f' % response.<response_field>)  # 응답 데이터 출력
        return response  # 응답 반환

def main(args=None):
    rclpy.init(args=args)  # ROS 2 초기화
    service_node = <ClassName>()  # 서비스 노드 인스턴스 생성
    rclpy.spin(service_node)  # 노드 실행 및 서비스 요청 대기
    service_node.destroy_node()  # 노드 종료
    rclpy.shutdown()  # ROS 2 시스템 종료

if __name__ == '__main__':
    main()
  • 중요한 것은 .srv의 데이터 타입과 코드 내의 형식을 일치시키는 것
  • service는 단기성 메세지를 발행하므로 누를때마다 실행되도록 코드 작성이 필요하다.
profile
공부하는 거북이

0개의 댓글