[ROS2] Service 통신 - Client & Server 생성

TaerinLog·2025년 5월 28일

Service

  • 노드간 통신을 위한 방식 중 하나
  • 요청(Request) <-> 응답(Response)으로 이루어진 n(client):1(server) 통신 방식
    ㄴ Service Server: 요청을 받아 처리하고 응답을 반환
    ㄴ Service Client: 요청을 보내고 응답을 기다림
  • 동기(call)/비동기(call_async) 통신 설정 가능

    동기 서비스 클라이언트는 교착 상태에 취약하지만, 교착 상태 발생 시 문제를 알려주지는 않습니다.

명령어

1. 서비스 목록

  • 존재하는 서비스 리스트를 볼 수 있음
    ros2 service list

2. 서비스 타입 확인

  • 서비스가 사용하는 서비스 타입 확인
    ros2 service type [service_name]

3. 인터페이스 목록

  • ros2 interface list

    ㄴ 표준 서비스는 제한된 구조를 가지므로, 필요 시 사용자 정의 메시지로 확장하는 것이 일반적

4. 인터페이스 데이터 구조

  • .srv파일 구조
  • 서버의 call_back 함수에서 request파라메터는 위 영역, response파라메터는 아래영역으로 나뉘어 들어감
# request 영역
int64 a
int64 b
---
# response 영역
int64 sum

5. 서비스 호출

  • 서버에 요청 보내기
    ㄴ 클라이언트 노드가 없어도 서버가 오픈되어 있다면 아래 명령어로 요청 가능
    ros2 service call /add_two_ints example_interfaces/srv/AddTwoInts "{a: 7, b: 8}"

명령어 요약

명령어설명
ros2 service list현재 활성화된 모든 서비스 목록 보기
ros2 service type [서비스 이름]해당 서비스의 타입 확인
ros2 service find [서비스 타입]해당 타입의 서비스를 제공하는 서버 목록 출력
ros2 interface show [서비스 타입]서비스의 요청/응답 구조 확인
ros2 service call [서비스 이름] [서비스 타입] "{요청 데이터}"해당 서비스에 직접 요청 보내기

Server & Client 생성

  • 클라이언트가 그리퍼에 대한 동작 요청
  • 서버가 그리퍼에 대한 요청에 대해 처리

1. Server & Client 노드 작성

  • server
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
from trajectory_msgs.msg import JointTrajectory, JointTrajectoryPoint

class GripperServiceServer(Node):
    def __init__(self):
        super().__init__('gripper_service_server')

        # 서비스 서버 오픈
        self.srv = self.create_service(SetBool, 'gripper_control', self.call_back)
        # gripper control publisher 생성
        self.publisher = self.create_publisher(JointTrajectory, '/gripper_controller/command', 10)
        self.get_logger().info('Gripper service server ready.')


    def call_back(self, request, response):
        ###--- service 요청 받음 ---### 
        # True=open, False=close
        open_gripper = request.data 

        ###--- service 요청에 따라 그리퍼를 여닫는 동작을 수행 ---###
        traj = JointTrajectory()
        # 어느 관절을 움직일지 명시
        traj.joint_names = ['gripper_left_finger_joint', 'gripper_right_finger_joint']

        point = JointTrajectoryPoint()
        # 얼만큼 벌릴지 명시
        point.positions = [0.035, 0.035] if open_gripper else [0.0, 0.0]
        point.time_from_start.sec = 1
        traj.points.append(point)

        # 관절 제어 토픽을 발행
        self.publisher.publish(traj)
        ###--- 동작 끝 ---###

        ###--- 동작을 수행한 후, 그 결과를 서비스 service 응답 반환---###  
        response.success = True
        response.message = 'Gripper opened' if open_gripper else 'Gripper closed'
        self.get_logger().info(response.message)

        return response
        
    
def main():
    rclpy.init()
    node = GripperServiceServer()
    rclpy.spin(node)
    node.destroy_node()
    rclpy.shutdown()
  • client
import rclpy
from rclpy.node import Node
from std_srvs.srv import SetBool
import time

class GripperServiceClient(Node):
    def __init__(self):
        super().__init__('gripper_service_client')
        self.client = self.create_client(SetBool,'gripper_control')

        while not self.client.wait_for_service(timeout_sec=1.0):
            self.get_logger().info('service not available, waiting again...')

        self.req = SetBool.Request()

    def send_request(self):
        self.response = self.client.call_async(self.req)

def main(args=None):
    rclpy.init(args=args)
    client = GripperServiceClient()
    client.req.data = True
    client.send_request()

    # ROS가 동작중이면 계속 실행
    while rclpy.ok():
        rclpy.spin_once(client, timeout_sec=0.1)
        # 응답이 오면 요청 결과값 출력 후 중단
        if client.response.done():
            try:
                res = client.response.result()
            except Exception as e :
                client.get_logger().info('service call fail %r' % (e,))
            else:
                client.get_logger().info(f"response: {res.success}, {res.message}")
            

        client.get_logger().info('call_async can doing another work ... ')
    
    client.destroy_node()
    rclpy.shutdown()

2. setup.py

  • entry_point 추가
    entry_points={
        'console_scripts': [
            'service_server = test_package.service_server:main',
            'service_client = test_package.service_client:main',
        ],
    },

3. 패키지 빌드 및 노드 실행

  • 패키지 빌드
    colcon build --symlink-install --packages-select test_package
  • 노드 실행
    ㄴ 서버
    ros2 run test_package service_server

    ㄴ 클라이언트
    ros2 run test_package service_client

새로운 interface 생성

# new_srv_data.srv
bool data
float64 linear
---
bool success
string message
  • CMakeLists.txt에 srv 추가
rosidl_generate_interfaces(${PROJECT_NAME}
  "srv/new_srv_data.srv" 
)

or

set(srv_files "srv/new_srv_data.srv")
rosidl_generate_interfaces(${PROJECT_NAME}
  ${srv_files}
)

# 여러개 추가시
set(srv_files
  "srv/NewSrvData.srv"
  "srv/AnotherSrv.srv"
  "srv/ExampleRequest.srv"
)
  • 패키지 빌드 후 서비스 생성 확인
    ros2 interface show [패키지명]/srv/new_srv_data
profile
taerin

0개의 댓글