
ex_calculator 패키지에서는 연산자를 임의 선택 후 전송하고 기존 저장된 변수 a,b를 계산하여 결과값을 서비스 응답값으로 전달하는 프로그램을 구성한다.
'''service'''
$ ros2 service list # 서비스 목록 확인
$ ros2 service type [서비스 이름] # 서비스의 타입 확인
$ ros2 service call [서비스 이름][서비스 타입]"{값}" # 특정 서비스르 호출
$ ros2 service call /reset std_srvs/srv/Empty
$ ros2 service call /turtle1/set_pen turtlesim/srv/SetPen "{r: 255, g: 255, width: 10}"
$ ros2 service call /kill turtlesim/srv/Kill "name: 'turtle1'"
$ ros2 service call /spawn turtlesim/srv/Spawn "{x: 5.5, y: 9.0, theta: 1.57, name: 'leonardo'}"
$ ros2 service find [서비스 타입] # 지정 서비스 타입의 서비스 출력
import random
from ros_study_msgs.srv import ArithmeticOperator
import rclpy
from rclpy.node import Node
operator이름으로 노드 정의create_client로 client 생성arithmetic_operator 이름의 서비스를 호출하는 클라이언트 생성ArithmeticOperator 타입의 요청을 받고 응답하는 서버와 통신class Operator(Node):
def __init__(self):
super().__init__('operator')
self.arithmetic_service_client = self.create_client(
ArithmeticOperator,
'arithmetic_operator')
while not self.arithmetic_service_client.wait_for_service(timeout_sec=0.1):
self.get_logger().warning('The arithmetic_operator service not available.')
def send_request(self):
service_request = ArithmeticOperator.Request()
service_request.arithmetic_operator = random.randint(1, 4)
futures = self.arithmetic_service_client.call_async(service_request)
return futures
service_request.arithmetic_operator = random.randint(1, 4)call_async: 비동기 방식으로 요청 전송futures: 서비스 상태 및 응답값을 담은 객체def main(args=None):
rclpy.init(args=args)
operator = Operator()
future = operator.send_request()
user_trigger = True
try:
while rclpy.ok():
if user_trigger is True:
rclpy.spin_once(operator)
if future.done():
try:
service_response = future.result()
except Exception as e: # noqa: B902
operator.get_logger().warn('Service call failed: {}'.format(str(e)))
else:
operator.get_logger().info(
'Result: {}'.format(service_response.arithmetic_result))
user_trigger = False
else:
input('Press Enter for next service call.')
future = operator.send_request()
user_trigger = True
Operator 클래스 --> operator로 생성
future = operator.send_request(): 서비스 요청을 보내고 응답값을 받음
spin_once 함수로 콜백함수 한번실행 후 이에 대한 서비스 응답값을 받았을 경우, 즉 future의 상태값이 done()일 경우 service_respoense에 저장하여 연산에 사용한다.
단, 서비스 클라이언트는 실행한 후 종료되는 컨셉으로 토픽처럼 지속적인 수행은 하지 않는다.
해당 코드에서만 user_trigger와 input을 이용하여 처음 노드 실행 시 최초 1회에 한해 바로 임의의 연산자를 서비스 요청값으로 서비스 서버에게 전달하여 노드가 종료되기 전가지 사용자의 입력값을 다시 받아 다시 임의의 연산자를 랜덤으로 골라 해당 연산자를 서비스 요청값으로 보내게 된다.
arithmetic_service_server는 서비스 서버로 Node 클래스의 create_service 함수를 이용하여 서비스 서버로 선언self.arithmetic_service_server = self.create_service(
ArithmeticOperator,
'arithmetic_operator',
self.get_arithmetic_operator,
callback_group=self.callback_group)
def get_arithmetic_operator(self, request, response):
self.argument_operator = request.arithmetic_operator
self.argument_result = self.calculate_given_formula(
self.argument_a,
self.argument_b,
self.argument_operator)
response.arithmetic_result = self.argument_result
self.argument_formula = '{0} {1} {2} = {3}'.format(
self.argument_a,
self.operator[self.argument_operator-1],
self.argument_b,
self.argument_result)
self.get_logger().info(self.argument_formula)
return response
def calculate_given_formula(self, a, b, operator):
if operator == ArithmeticOperator.Request.PLUS:
self.argument_result = a + b
elif operator == ArithmeticOperator.Request.MINUS:
self.argument_result = a - b
elif operator == ArithmeticOperator.Request.MULTIPLY:
self.argument_result = a * b
elif operator == ArithmeticOperator.Request.DIVISION:
try:
self.argument_result = a / b
except ZeroDivisionError:
self.get_logger().error('ZeroDivisionError!')
self.argument_result = 0.0
return self.argument_result
else:
self.get_logger().error(
'Please make sure arithmetic operator(plus, minus, multiply, division).')
self.argument_result = 0.0
return self.argument_result
# 필수 import
import rclpy
from rclpy.node import Node
# srv 파일을 import (패키지 이름과 서비스 파일 이름을 명확히 해야 함)
from <pkg_name>.srv import <SrvFileName> # ex. from my_package.srv import AddTwoInts
# QoS는 서비스에서 사용되지 않음 -> 서비스는 요청-응답 기반의 단일 통신
class <ClassName>(Node): # 클래스 이름 정의
def __init__(self):
super().__init__('client_node_name') # 클라이언트 노드 이름 설정
# 서비스 클라이언트 생성 (서비스 타입과 서비스 이름 필요)
self.client_ = self.create_client(<SrvFileName>, 'service_name')
# 서비스가 준비될 때까지 기다림 (비동기 통신이므로 서비스가 실행 중일 때까지 대기)
while not self.client_.wait_for_service(timeout_sec=1.0):
self.get_logger().info('Service not available, waiting...')
# 서비스 요청을 처리하는 콜백 함수
def client_callback_function(self, request_data):
# 요청 객체 생성
request = <SrvFileName>.Request()
request.<request_field> = request_data # ex. request.a = 5
# 비동기 방식으로 서비스 요청
future = self.client_.call_async(request)
rclpy.spin_until_future_complete(self, future) # 응답이 올 때까지 대기
# 응답 처리
try:
response = future.result() # 비동기 응답 결과를 가져옴
self.get_logger().info('Result: %s' % str(response.<response_field>)) # 응답 결과 출력
except Exception as e:
self.get_logger().error('Service call failed: %r' % (e,)) # 예외 처리
return future
def main(args=None):
rclpy.init(args=args) # ROS 2 초기화
client_node = <ClassName>() # 클라이언트 클래스 인스턴스 생성
# 서비스에 요청할 데이터 (예: 랜덤 숫자, 사용자 입력 등)
request_data = <some_data> # 예: 5
# 요청 전송 함수 호출
client_node.client_callback_function(request_data)
client_node.destroy_node() # 노드 종료
rclpy.shutdown() # ROS 2 시스템 종료
if __name__ == '__main__':
main()
# 필수 import
import rclpy
from rclpy.node import Node
# srv 파일을 import (패키지 이름과 서비스 파일 이름을 명확히 해야 함)
from <pkg_name>.srv import <SrvFileName>
class <ClassName>(Node): # 클래스 이름 정의
def __init__(self):
super().__init__('server_node_name') # 서버 노드 이름 설정
# 서비스 서버 생성 (서비스 타입과 서비스 이름 필요)
self.service_ = self.create_service(
<SrvFileName>, # 서비스 타입
'service_name', # 서비스 이름
self.service_callback_function # 콜백 함수로 서비스 요청 처리
)
self.get_logger().info('Service server ready and waiting for requests...')
# 서비스 요청을 처리하는 콜백 함수
def service_callback_function(self, request, response):
self.get_logger().info('Received request: %d' % request.<request_field>) # 요청 데이터 출력
# 요청 처리 (연산 등)
if request.<request_field> == <SrvFileName>.Request.PLUS:
response.<response_field> = request.a + request.b # 덧셈 처리 예시
elif request.<request_field> == <SrvFileName>.Request.MINUS:
response.<response_field> = request.a - request.b # 뺄셈 처리 예시
elif request.<request_field> == <SrvFileName>.Request.MULTIPLY:
response.<response_field> = request.a * request.b # 곱셈 처리 예시
elif request.<request_field> == <SrvFileName>.Request.DIVISION:
if request.b != 0:
response.<response_field> = request.a / request.b # 나눗셈 처리 예시
else:
response.<response_field> = float('inf') # 0으로 나눌 때 무한대 처리
else:
self.get_logger().error('Invalid operation requested!') # 잘못된 연산자 처리
self.get_logger().info('Sending response: %f' % response.<response_field>) # 응답 데이터 출력
return response # 응답 반환
def main(args=None):
rclpy.init(args=args) # ROS 2 초기화
service_node = <ClassName>() # 서비스 노드 인스턴스 생성
rclpy.spin(service_node) # 노드 실행 및 서비스 요청 대기
service_node.destroy_node() # 노드 종료
rclpy.shutdown() # ROS 2 시스템 종료
if __name__ == '__main__':
main()